This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new ab20a93a90 NIFI-11493: Defaulted dynamically modified classpath fix
ab20a93a90 is described below
commit ab20a93a90ab571df1d033db0d77e166e6ea10af
Author: Lehel Boér <[email protected]>
AuthorDate: Thu Apr 27 21:43:12 2023 +0200
NIFI-11493: Defaulted dynamically modified classpath fix
This closes #7201.
Co-authored-by: Peter Turcsanyi <[email protected]>
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/controller/AbstractComponentNode.java | 38 +++++---
.../DefaultedDynamicallyModifyClasspath.java | 104 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../DefaultedDynamicClassPathModificationIT.java | 94 +++++++++++++++++++
4 files changed, 222 insertions(+), 15 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 5935bf7cc9..59dfa865d5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -604,6 +604,14 @@ public abstract class AbstractComponentNode implements
ComponentNode {
return getProperty(property).getEffectiveValue(getParameterContext());
}
+ private String getEffectivePropertyValueWithDefault(final
PropertyDescriptor property) {
+ String value =
getProperty(property).getEffectiveValue(getParameterContext());
+ if (value == null) {
+ value = property.getDefaultValue();
+ }
+ return value;
+ }
+
@Override
public String getRawPropertyValue(final PropertyDescriptor property) {
return getProperty(property).getRawValue();
@@ -662,23 +670,23 @@ public abstract class AbstractComponentNode implements
ComponentNode {
*/
@Override
public synchronized void reloadAdditionalResourcesIfNecessary() {
- // Components that don't have any PropertyDescriptors marked
`dynamicallyModifiesClasspath`
- // won't have the fingerprint i.e. will be null, in such cases do
nothing
- if (additionalResourcesFingerprint == null) {
- return;
- }
-
final Set<PropertyDescriptor> descriptors =
this.getProperties().keySet();
- final Set<URL> additionalUrls =
this.getAdditionalClasspathResources(descriptors);
- final String newFingerprint =
ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls,
determineClasloaderIsolationKey());
- if(!StringUtils.equals(additionalResourcesFingerprint,
newFingerprint)) {
- setAdditionalResourcesFingerprint(newFingerprint);
- try {
- logger.info("Updating classpath for " + this.componentType + "
with the ID " + this.getIdentifier());
- reload(additionalUrls);
- } catch (Exception e) {
- logger.error("Error reloading component with id " + id + ": "
+ e.getMessage(), e);
+ final boolean dynamicallyModifiesClasspath = descriptors.stream()
+ .anyMatch(PropertyDescriptor::isDynamicClasspathModifier);
+
+ if (dynamicallyModifiesClasspath) {
+ final Set<URL> additionalUrls =
this.getAdditionalClasspathResources(descriptors,
this::getEffectivePropertyValueWithDefault);
+
+ final String newFingerprint =
ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls,
determineClasloaderIsolationKey());
+ if (!StringUtils.equals(additionalResourcesFingerprint,
newFingerprint)) {
+ setAdditionalResourcesFingerprint(newFingerprint);
+ try {
+ logger.info("Updating classpath for [{}] with the ID
[{}]", this.componentType, this.getIdentifier());
+ reload(additionalUrls);
+ } catch (Exception e) {
+ logger.error("Error reloading component with id [{}]: {}",
id, e.getMessage(), e);
+ }
}
}
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
new file mode 100644
index 0000000000..673c3c34ed
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DefaultedDynamicallyModifyClasspath.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@RequiresInstanceClassLoading
+public class DefaultedDynamicallyModifyClasspath extends AbstractProcessor {
+
+ static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
+ .name("URLs to Load")
+ .description("URLs to load onto the classpath")
+ .required(false)
+ .defaultValue("lib/bootstrap/commons-lang3-3.12.0.jar")
+ .dynamicallyModifiesClasspath(true)
+ .identifiesExternalResource(ResourceCardinality.MULTIPLE,
ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
+ .build();
+
+ static final PropertyDescriptor CLASS_TO_LOAD = new
PropertyDescriptor.Builder()
+ .name("Class to Load")
+ .description("The name of the Class to load")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles are routed to this relationship if the
specified class can be loaded")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles are routed to this relationship if the
specified class cannot be loaded")
+ .build();
+
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(URLS, CLASS_TO_LOAD);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String classToLoad =
context.getProperty(CLASS_TO_LOAD).getValue();
+ try {
+ final Class<?> clazz = Class.forName(classToLoad);
+ try (final OutputStream out = session.write(flowFile);
+ final OutputStreamWriter streamWriter = new
OutputStreamWriter(out);
+ final BufferedWriter writer = new
BufferedWriter(streamWriter)) {
+
+ writer.write(clazz.getName());
+ writer.newLine();
+ writer.write(clazz.getClassLoader().toString());
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final Exception e) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a7919d8331..1f278c32ac 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -21,6 +21,7 @@ org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
+org.apache.nifi.processors.tests.system.DefaultedDynamicallyModifyClasspath
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
new file mode 100644
index 0000000000..5d0c7dfeaf
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DefaultedDynamicClassPathModificationIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.processor;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+class DefaultedDynamicClassPathModificationIT extends NiFiSystemIT {
+
+ private ProcessorEntity generateFlowFileProcessor;
+ private ProcessorEntity defaultedModifyClasspathProcessor;
+
+ private ConnectionEntity defaultedModifyClasspathInputConnection;
+ private ConnectionEntity successConnection;
+ private ConnectionEntity failureConnection;
+
+ @Test
+ void testLoadsClassFromDefaultedDynamicModification() throws
NiFiClientException, IOException, InterruptedException {
+ createFlow();
+
+ // Update modify to have the appropriate URL, don't update URL to load
to let it on default value
+ final Map<String, String> propertyMap = new HashMap<>();
+ propertyMap.put("Class to Load",
"org.apache.commons.lang3.StringUtils");
+
getClientUtil().updateProcessorProperties(defaultedModifyClasspathProcessor,
propertyMap);
+
getClientUtil().waitForValidProcessor(defaultedModifyClasspathProcessor.getId());
+
+ // Create a FlowFile
+
getClientUtil().waitForValidProcessor(generateFlowFileProcessor.getId());
+ getClientUtil().startProcessor(generateFlowFileProcessor);
+ waitForQueueCount(defaultedModifyClasspathInputConnection.getId(), 1);
+
+ // Wait for a FlowFile to be routed to success
+ getClientUtil().startProcessor(defaultedModifyClasspathProcessor);
+ waitForQueueCount(successConnection.getId(), 1);
+
+ getClientUtil().stopProcessor(generateFlowFileProcessor);
+
getClientUtil().waitForStoppedProcessor(generateFlowFileProcessor.getId());
+
+ // Restart and ensure that everything works as expected after restart
+ getNiFiInstance().stop();
+ getNiFiInstance().start(true);
+
+ // Feed another FlowFile through. Upon restart, in order to modify, we
need to get the most up-to-date revision so will first fetch the Processor
+ final ProcessorEntity generateAfterRestart =
getNifiClient().getProcessorClient().getProcessor(generateFlowFileProcessor.getId());
+ getClientUtil().waitForValidProcessor(generateAfterRestart.getId());
+ getClientUtil().startProcessor(generateAfterRestart);
+
+ // Depending on whether or not the flow was written out with the
processor running, the Modify processor may or may not be running. Ensure that
it is running.
+
getClientUtil().waitForValidationCompleted(defaultedModifyClasspathProcessor);
+ final ProcessorEntity modifyAfterRestart =
getNifiClient().getProcessorClient().getProcessor(defaultedModifyClasspathProcessor.getId());
+ final String modifyRunStatus =
modifyAfterRestart.getStatus().getRunStatus();
+ if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
+ getClientUtil().startProcessor(modifyAfterRestart);
+ }
+
+ // We now expect 2 FlowFiles to be in the success route
+ waitForQueueCount(successConnection.getId(), 2);
+ }
+
+ // We have several tests running the same flow but with different
configuration. Since we need to reference the ProcessorEntities and
ConnectionEntities, we have a method
+ // that creates the flow and stores the entities are member variables
+ private void createFlow() throws NiFiClientException, IOException {
+ generateFlowFileProcessor =
getClientUtil().createProcessor("GenerateFlowFile");
+ defaultedModifyClasspathProcessor =
getClientUtil().createProcessor("DefaultedDynamicallyModifyClasspath");
+ ProcessorEntity terminateSuccess =
getClientUtil().createProcessor("TerminateFlowFile");
+ ProcessorEntity terminateFailure =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ defaultedModifyClasspathInputConnection =
getClientUtil().createConnection(generateFlowFileProcessor,
defaultedModifyClasspathProcessor, "success");
+ successConnection =
getClientUtil().createConnection(defaultedModifyClasspathProcessor,
terminateSuccess, "success");
+ failureConnection =
getClientUtil().createConnection(defaultedModifyClasspathProcessor,
terminateFailure, "failure");
+ }
+}