This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6adfb6131d NIFI-11310: Provide appropriate classpath resources to the 
ReloadComponent when a processor is terminated
6adfb6131d is described below

commit 6adfb6131db831c9fa33bf1292b7b225482b0035
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 20 15:42:34 2023 -0400

    NIFI-11310: Provide appropriate classpath resources to the ReloadComponent 
when a processor is terminated
    
    NIFI-11310: Fixed META-INF/services file that was mistakenly listing an 
extra extension point, due to rebase
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #7061
---
 .../scheduling/StandardProcessScheduler.java       |   6 +-
 .../tests/system/DynamicallyModifyClasspath.java   | 125 ++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processor/DynamicClassPathModificationIT.java  | 191 +++++++++++++++++++++
 .../cli/impl/client/nifi/ProcessorClient.java      |   2 +
 .../client/nifi/impl/JerseyProcessorClient.java    |  14 ++
 6 files changed, 337 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 14019dca2d..87a5ac558f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -56,8 +56,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
+import java.net.URL;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -440,7 +441,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated);
 
         try {
-            flowController.getReloadComponent().reload(procNode, 
procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), 
Collections.emptySet());
+            final Set<URL> additionalUrls = 
procNode.getAdditionalClasspathResources(procNode.getPropertyDescriptors());
+            flowController.getReloadComponent().reload(procNode, 
procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), 
additionalUrls);
         } catch (final ProcessorInstantiationException e) {
             // This shouldn't happen because we already have been able to 
instantiate the processor before
             LOG.error("Failed to replace instance of Processor for {} when 
terminating Processor", procNode);
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
new file mode 100644
index 0000000000..3e77c010d4
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@RequiresInstanceClassLoading
+public class DynamicallyModifyClasspath extends AbstractProcessor {
+
+    static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
+        .name("URLs to Load")
+        .description("URLs to load onto the classpath")
+        .required(false)
+        .dynamicallyModifiesClasspath(true)
+        .identifiesExternalResource(ResourceCardinality.MULTIPLE, 
ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
+        .build();
+
+    static final PropertyDescriptor CLASS_TO_LOAD = new 
PropertyDescriptor.Builder()
+        .name("Class to Load")
+        .description("The name of the Class to load")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor SLEEP_DURATION = new 
PropertyDescriptor.Builder()
+        .name("Sleep Duration")
+        .description("Amount of time to sleep in the onTrigger method")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .defaultValue("0 sec")
+        .build();
+
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles are routed to this relationship if the 
specified class can be loaded")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("FlowFiles are routed to this relationship if the 
specified class cannot be loaded")
+        .build();
+
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(URLS, CLASS_TO_LOAD, SLEEP_DURATION);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long sleepMillis = 
context.getProperty(SLEEP_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (sleepMillis > 0) {
+            try {
+                Thread.sleep(sleepMillis);
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        final String classToLoad = 
context.getProperty(CLASS_TO_LOAD).getValue();
+        try {
+            final Class<?> clazz = Class.forName(classToLoad);
+            try (final OutputStream out = session.write(flowFile);
+                 final OutputStreamWriter streamWriter = new 
OutputStreamWriter(out);
+                 final BufferedWriter writer = new 
BufferedWriter(streamWriter)) {
+
+                writer.write(clazz.getName());
+                writer.newLine();
+                writer.write(clazz.getClassLoader().toString());
+            }
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b0262a6dce..83e1f6a3fd 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,6 +20,7 @@ 
org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
 org.apache.nifi.processors.tests.system.DependOnProperties
 org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
 org.apache.nifi.processors.tests.system.Duplicate
+org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
 org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
 org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
 org.apache.nifi.processors.tests.system.FakeProcessor
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
new file mode 100644
index 0000000000..a5561e40bf
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicClassPathModificationIT extends NiFiSystemIT {
+
+    private ProcessorEntity generate;
+    private ProcessorEntity modify;
+
+    private ConnectionEntity modifyInputConnection;
+    private ConnectionEntity successConnection;
+    private ConnectionEntity failureConnection;
+
+
+    @Test
+    public void testLoadsClassOnBaseClasspath() throws NiFiClientException, 
IOException, InterruptedException {
+        createFlow();
+
+        // Configure with a class that is always on the classpath
+        final Map<String, String> propertyMap = new HashMap<>();
+        propertyMap.put("Class to Load", "org.apache.nifi.flowfile.FlowFile");
+        getClientUtil().updateProcessorProperties(modify, propertyMap);
+        getClientUtil().waitForValidProcessor(modify.getId());
+
+        // Let Generate create a FlowFile
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(modifyInputConnection.getId(), 1);
+
+        // Start the processor and make sure that the FlowFile is routed to 
success
+        getClientUtil().startProcessor(modify);
+        waitForQueueCount(successConnection.getId(), 1);
+        assertEquals(0, getConnectionQueueSize(failureConnection.getId()));
+    }
+
+    @Test
+    public void testLoadsClassFromDynamicModification() throws 
NiFiClientException, IOException, InterruptedException {
+        createFlow();
+
+        final Map<String, String> propertyMap = new HashMap<>();
+        propertyMap.put("Class to Load", 
"org.apache.commons.lang3.StringUtils");
+        getClientUtil().updateProcessorProperties(modify, propertyMap);
+        getClientUtil().waitForValidProcessor(modify.getId());
+
+        // Let Generate create a FlowFile
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(modifyInputConnection.getId(), 1);
+
+        // Start the processor and we expect the FlowFile to go to failure 
because the StringUtils class should not be available
+        getClientUtil().startProcessor(modify);
+        waitForQueueCount(failureConnection.getId(), 1);
+        assertEquals(0, getConnectionQueueSize(successConnection.getId()));
+
+        getClientUtil().stopProcessor(modify);
+        getClientUtil().stopProcessor(generate);
+
+        getClientUtil().waitForStoppedProcessor(modify.getId());
+        getClientUtil().waitForStoppedProcessor(generate.getId());
+
+        // Update modify to have the appropriate URL
+        propertyMap.put("URLs to Load", 
getCommonsLangJar().toURI().toURL().toString());
+        getClientUtil().updateProcessorProperties(modify, propertyMap);
+        getClientUtil().waitForValidProcessor(modify.getId());
+
+        // Let Generate create another FlowFile
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(modifyInputConnection.getId(), 1);
+
+        // Wait for a FlowFile to be routed to success
+        getClientUtil().startProcessor(modify);
+        waitForQueueCount(successConnection.getId(), 1);
+
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().waitForStoppedProcessor(generate.getId());
+
+        // Restart and ensure that everything works as expected after restart
+        getNiFiInstance().stop();
+        getNiFiInstance().start(true);
+
+        // Feed another FlowFile through. Upon restart, in order to modify, we 
need to get the most up-to-date revision so will first fetch the Processor
+        final ProcessorEntity generateAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
+        getClientUtil().startProcessor(generateAfterRestart);
+
+        // Depending on whether or not the flow was written out with the 
processor running, the Modify processor may or may not be running. Ensure that 
it is running.
+        getClientUtil().waitForValidationCompleted(modify);
+        final ProcessorEntity modifyAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(modify.getId());
+        final String modifyRunStatus = 
modifyAfterRestart.getStatus().getRunStatus();
+        if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
+            getClientUtil().startProcessor(modifyAfterRestart);
+        }
+
+        // We now expect 2 FlowFiles to be in the success route
+        waitForQueueCount(successConnection.getId(), 2);
+    }
+
+
+    @Test
+    public void testSuccessAfterTerminate() throws NiFiClientException, 
IOException, InterruptedException {
+        createFlow();
+
+        // Configure so that the processor should succeed but sleep for 5 mins 
so that we can terminate it
+        final Map<String, String> propertyMap = new HashMap<>();
+        propertyMap.put("Class to Load", 
"org.apache.commons.lang3.StringUtils");
+        propertyMap.put("URLs to Load", 
getCommonsLangJar().toURI().toURL().toString());
+        propertyMap.put("Sleep Duration", "${sleep}");
+        getClientUtil().updateProcessorProperties(modify, propertyMap);
+        getClientUtil().waitForValidProcessor(modify.getId());
+
+        // Tell Generate Processor to add an attribute named 'sleep' with 5 
mins as the value
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("sleep", "5 mins"));
+        getClientUtil().waitForValidProcessor(generate.getId());
+
+        // Let Generate create a FlowFile
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(modifyInputConnection.getId(), 1);
+
+        // Start the processor, wait a bit, stop it and terminate it.
+        getClientUtil().startProcessor(modify);
+        Thread.sleep(2000L);
+        getNifiClient().getProcessorClient().stopProcessor(modify);
+        
getNifiClient().getProcessorClient().terminateProcessor(modify.getId());
+        getClientUtil().waitForStoppedProcessor(modify.getId());
+
+        // Empty the queue and generate another FlowFile with a sleep of 0 sec
+        getClientUtil().emptyQueue(modifyInputConnection.getId());
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("sleep", "0 sec"));
+        getClientUtil().waitForValidProcessor(generate.getId());
+        getClientUtil().startProcessor(generate);
+
+        // Start processor and expect data to go to 'success'. This time the 
processor will not sleep in onTrigger because
+        // it is configured to sleep only on the first iteration after an 
update.
+        getClientUtil().startProcessor(modify);
+        waitForQueueCount(successConnection.getId(), 1);
+    }
+
+    private File getCommonsLangJar() {
+        final File bootstrapLib = new 
File(getNiFiInstance().getInstanceDirectory(), "lib/bootstrap");
+        final File[] commonsLangJars = bootstrapLib.listFiles(file -> 
file.getName().startsWith("commons-lang"));
+        if (commonsLangJars == null || commonsLangJars.length == 0) {
+            throw new IllegalStateException("Could not find commons-lang jar 
in bootstrap lib directory");
+        }
+
+        if (commonsLangJars.length > 1) {
+            throw new IllegalStateException("Found multiple commons-lang jars 
in bootstrap lib directory");
+        }
+
+        return commonsLangJars[0];
+    }
+
+    // We have several tests running the same flow but with different 
configuration. Since we need to reference the ProcessorEntities and 
ConnectionEntities, we have a method
+    // that creates the flow and stores the entities are member variables
+    private void createFlow() throws NiFiClientException, IOException {
+        generate = getClientUtil().createProcessor("GenerateFlowFile");
+        modify = getClientUtil().createProcessor("DynamicallyModifyClasspath");
+        ProcessorEntity terminateSuccess = 
getClientUtil().createProcessor("TerminateFlowFile");
+        ProcessorEntity terminateFailure = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        modifyInputConnection = getClientUtil().createConnection(generate, 
modify, "success");
+        successConnection = getClientUtil().createConnection(modify, 
terminateSuccess, "success");
+        failureConnection = getClientUtil().createConnection(modify, 
terminateFailure, "failure");
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
index 1b355fbe8c..40a6c6537c 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java
@@ -57,6 +57,8 @@ public interface ProcessorClient {
 
     PropertyDescriptorEntity getPropertyDescriptor(String processorId, String 
propertyName, Boolean sensitive) throws NiFiClientException, IOException;
 
+    ProcessorEntity terminateProcessor(String processorId) throws 
NiFiClientException, IOException;
+
     /**
      * Indicates that mutable requests should indicate that the client has 
acknowledged that the node is disconnected.
      */
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
index 3de97619b7..bcd22ed046 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java
@@ -275,4 +275,18 @@ public class JerseyProcessorClient extends 
AbstractJerseyClient implements Proce
             return 
getRequestBuilder(target).get(PropertyDescriptorEntity.class);
         });
     }
+
+    @Override
+    public ProcessorEntity terminateProcessor(final String processorId) throws 
NiFiClientException, IOException {
+        Objects.requireNonNull(processorId, "Processor ID required");
+
+        return executeAction("Error terminating Processor", () -> {
+            final WebTarget target = processorTarget
+                .path("/threads")
+                .resolveTemplate("id", processorId);
+
+            return getRequestBuilder(target).delete(ProcessorEntity.class);
+        });
+    }
+
 }

Reply via email to