This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2ad9db18db NIFI-12959: Support loading Python processors from NARs
2ad9db18db is described below
commit 2ad9db18db3d0ef0a1c0e3a9241e418299de78c3
Author: Mark Payne <[email protected]>
AuthorDate: Tue Mar 26 10:55:02 2024 -0400
NIFI-12959: Support loading Python processors from NARs
This closes #8573
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/util/NiFiProperties.java | 7 +-
.../src/main/asciidoc/python-developer-guide.adoc | 88 ++++++++++-------
.../components/ClassLoaderAwarePythonBridge.java | 4 +-
.../org/apache/nifi/controller/FlowController.java | 15 ++-
.../nifi/nar/ExtensionDiscoveringManager.java | 9 +-
.../nar/StandardExtensionDiscoveringManager.java | 22 +++--
.../nifi/py4j/ProcessorCreationWorkflow.java | 13 +++
.../java/org/apache/nifi/py4j/PythonProcess.java | 53 ++++++++--
.../org/apache/nifi/py4j/StandardPythonBridge.java | 51 ++++++++--
.../nifi/py4j/StandardPythonProcessorBridge.java | 7 ++
.../org/apache/nifi/py4j/PythonProcessTest.java | 27 +++--
.../PythonControllerInteractionIT.java | 8 +-
.../apache/nifi/python/DisabledPythonBridge.java | 2 +-
.../java/org/apache/nifi/python/PythonBridge.java | 4 +-
.../apache/nifi/python/PythonProcessConfig.java | 16 ++-
.../apache/nifi/python/PythonProcessorDetails.java | 12 +++
.../src/main/python/framework/Controller.py | 2 -
.../src/main/python/framework/ExtensionDetails.py | 10 ++
.../src/main/python/framework/ExtensionManager.py | 32 +++---
.../main/python/framework/ProcessorInspection.py | 15 ++-
.../python/framework/TestPythonProcessorAdapter.py | 5 +-
.../nifi-python-test-extensions-nar/pom.xml | 84 ++++++++++++++++
.../src/main/resources/WriteBech32Charset.py | 32 ++++++
nifi-system-tests/nifi-system-test-suite/pom.xml | 7 ++
.../src/test/assembly/dependencies.xml | 12 +++
.../nifi/tests/system/python/PythonNarIT.java | 110 +++++++++++++++++++++
nifi-system-tests/pom.xml | 1 +
27 files changed, 541 insertions(+), 107 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 2e3829fe36..8932760b14 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.util;
+import org.apache.nifi.properties.ApplicationProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -36,9 +40,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.nifi.properties.ApplicationProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The NiFiProperties class holds all properties which are needed for various
diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
index 8e52375e99..b798fb6d0f 100644
--- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc
@@ -516,37 +516,6 @@ environment for each Processor implementation (not for
each instance of a Proces
dependencies in that environment.
-[[deploying]]
-== Deploying a Developed Processor
-
-Once a Processor has been developed, it can be made available in NiFi by
copying the source of the Python extension to the
`$NIFI_HOME/python/extensions` directory by default.
-The actual directory to look for extensions can be configured in
`nifi.properties` via properties that have the prefix
`nifi.python.extensions.source.directory.`.
-For example, by default, `nifi.python.extensions.source.directory.default` is
set to `./python/extensions`. However, additional paths may be added by
replacing `default`
-in the property name with some other value.
-
-Any `.py` file found in the directory will be parsed and examined in order to
determine whether or not it is a valid NiFi Processor.
-In order to be found, the Processor must have a valid parent
(`FlowFileTransform` or `RecordTransform`) and must have an inner class named
`Java`
-with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']`
or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
-This will allow NiFi to automatically discover the Processor.
-
-Note, however, that if the Processor implementation is broken into multiple
Python modules, those modules will not be made available by default. In order
-to package a Processor along with its modules, the Processor and any related
module must be added to a directory that is directly below the Extensions
directory.
-For example, if the `WriteNumber.py` file contains a NiFi Processor and also
depends on the `ProcessorUtil.py` module, the directory structure would look
like this:
-----
-NIFI_HOME/
- - python/
- - extensions/
- ProcessorA.py
- ProcessorB.py
- write-number/
- __init__.py
- ProcessorUtils.py
- WriteNumber.py
-----
-By packaging them together in a subdirectory, NiFi knows to expose the modules
to one another. However, the ProcessorA module will have no access
-to the `ProcessorUtils` module. Only `WriteNumber` will have access to it.
-
-
[[reloading]]
== Processor Reloading
@@ -623,15 +592,66 @@ Here, we accept any version of `pandas` (though the
latest is preferred), and we
[[dependency-isolation]]
=== Dependency Isolation
-On startup, NiFi will create a separate Python env (venv) for each Processor
implementation and will use `pip` to install
-the specified dependencies from PyPI only into the appropriate Python
environment for that Processor. Therefore, dependencies of one
-Processor are not made available to another Processor.
+The first time that a user creates a NiFi Processor of a given type, NiFi will
create a separate Python env (venv) for the Processor.
+It will use `pip` to install the specified dependencies from PyPI only into
the appropriate Python environment for that Processor.
+Therefore, dependencies of one Processor are not made available to another
Processor.
Beyond that, dependencies of one version of a Processor are not made available
to other versions of the Processor. So, for example,
if we have two different versions of the same Processor made available,
version `0.0.1` and version `0.0.2`, the dependencies that are
necessary for version `0.0.1` will not be made available to version `0.0.2`
unless version `0.0.2` of the Processor also declares
those dependencies.
+Some environments, however, cannot make use of `pip` for package management.
In an air-gapped environment, for example, or in
+environments with strict security policies in place, `pip` may not be
available. In such a case, Python processors can be packaged
+using the NiFi ARchive (NAR) format. This is a .zip file with the following
specific layout, and uses a filename extension of `.nar`:
+
+```
+my-nar-bundle.nar
++-- META-INF/
+ +-- MANIFEST.MF
++-- NAR-INF/
+ +-- bundled-dependencies/
+ +-- dependency1
+ +-- dependency2
+ +-- ...
+ +-- dependencyN
+MyProcessor.py
+```
+
+
+[[deploying]]
+== Deploying a Developed Processor
+
+Once a Processor has been developed, it can be made available in NiFi using
one of two methods.
+For Processors that have been packaged as a NAR file, the NAR file should be
copied to NiFi's `lib/` directory or configured extensions directory.
+For Processors that are not pre-packaged as a NAR, the Processor is deployed
by copying the source of the Python extension to the
`$NIFI_HOME/python/extensions` directory by default.
+The actual directory to look for extensions can be configured in
`nifi.properties` via properties that have the prefix
`nifi.python.extensions.source.directory.`.
+For example, by default, `nifi.python.extensions.source.directory.default` is
set to `./python/extensions`. However, additional paths may be added by
replacing `default`
+in the property name with some other value.
+
+Any `.py` file found in the directory will be parsed and examined in order to
determine whether or not it is a valid NiFi Processor.
+In order to be found, the Processor must have a valid parent
(`FlowFileTransform` or `RecordTransform`) and must have an inner class named
`Java`
+with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']`
or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
+This will allow NiFi to automatically discover the Processor.
+
+Note, however, that if the Processor implementation is broken into multiple
Python modules, those modules will not be made available by default. In order
+to package a Processor along with its modules, the Processor and any related
module must be added to a directory that is directly below the Extensions
directory.
+For example, if the `WriteNumber.py` file contains a NiFi Processor and also
depends on the `ProcessorUtil.py` module, the directory structure would look
like this:
+----
+NIFI_HOME/
+ - python/
+ - extensions/
+ ProcessorA.py
+ ProcessorB.py
+ write-number/
+ __init__.py
+ ProcessorUtils.py
+ WriteNumber.py
+----
+By packaging them together in a subdirectory, NiFi knows to expose the modules
to one another. However, the ProcessorA module will have no access
+to the `ProcessorUtils` module. Only `WriteNumber` will have access to it.
+
+
[[troubleshooting]]
== Troubleshooting
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
index 5679a8f45b..3546ca7ece 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
@@ -89,9 +89,9 @@ public class ClassLoaderAwarePythonBridge implements
PythonBridge {
}
@Override
- public void discoverExtensions() {
+ public void discoverExtensions(final boolean includeNarDirectories) {
try (final NarCloseable narCloseable =
NarCloseable.withComponentNarLoader(classLoader)) {
- delegate.discoverExtensions();
+ delegate.discoverExtensions(includeNarDirectories);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d3eb8644cc..8c28496a4d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -102,8 +102,8 @@ import
org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.CronSchedulingAgent;
+import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
@@ -216,8 +216,6 @@ import
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.NotificationEmitter;
-import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
@@ -250,6 +248,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.management.NotificationEmitter;
+import javax.net.ssl.SSLContext;
import static java.util.Objects.requireNonNull;
@@ -900,10 +900,19 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
maxProcessesPerType = maxProcesses;
}
+ final List<File> narDirectories = new ArrayList<>();
+ for (final org.apache.nifi.bundle.Bundle bundle :
extensionManager.getAllBundles()) {
+ final File workingDir =
bundle.getBundleDetails().getWorkingDirectory();
+ if (workingDir.exists()) {
+ narDirectories.add(workingDir);
+ }
+ }
+
final PythonProcessConfig pythonProcessConfig = new
PythonProcessConfig.Builder()
.pythonCommand(pythonCommand)
.pythonFrameworkDirectory(pythonFrameworkSourceDirectory)
.pythonExtensionsDirectories(pythonExtensionsDirectories)
+ .narDirectories(narDirectories)
.pythonWorkingDirectory(pythonWorkingDirectory)
.commsTimeout(commsTimeout == null ? null :
Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout,
TimeUnit.MILLISECONDS)))
.maxPythonProcesses(maxProcesses)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java
index 1de8da3a00..0c67e92e9f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionDiscoveringManager.java
@@ -64,10 +64,15 @@ public interface ExtensionDiscoveringManager extends
ExtensionManager {
void setPythonBridge(PythonBridge pythonBridge);
/**
- * Discovers any Python based extensions using the given Python Bridge
- * @param pythonBundle the system bundle
+ * Discovers any Python based extensions that exist in either the Python
extensions directories or NAR bundles that have been expanded.
+ * @param pythonBundle the python bundle
*/
void discoverPythonExtensions(Bundle pythonBundle);
+ /**
+ * Discovers any new Python based extensions that have been added. This
method will scan only the Python extension directories
+ * that have been configured and will not include scanning NAR bundles.
+ * @param pythonBundle the python bundle
+ */
void discoverNewPythonExtensions(Bundle pythonBundle);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index 3931f46898..fe9b2cc002 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -75,7 +75,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
/**
* Scans through the classpath to load all FlowFileProcessors,
FlowFileComparators, and ReportingTasks using the service provider API and
running through all classloaders (root, NARs).
@@ -136,9 +135,7 @@ public class StandardExtensionDiscoveringManager implements
ExtensionDiscovering
@Override
public Set<Bundle> getAllBundles() {
- return classNameBundleLookup.values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toSet());
+ return new HashSet<>(bundleCoordinateBundleLookup.values());
}
@Override
@@ -186,10 +183,20 @@ public class StandardExtensionDiscoveringManager
implements ExtensionDiscovering
@Override
public void discoverPythonExtensions(final Bundle pythonBundle) {
+ discoverPythonExtensions(pythonBundle, true);
+ }
+
+ @Override
+ public void discoverNewPythonExtensions(final Bundle pythonBundle) {
+ logger.debug("Scanning to discover new Python extensions...");
+ discoverPythonExtensions(pythonBundle, false);
+ }
+
+ private void discoverPythonExtensions(final Bundle pythonBundle, final
boolean includeNarBundles) {
logger.debug("Scanning to discover which Python extensions are
available and importing any necessary dependencies. If new components are
discovered, this may take a few minutes. " +
"See python logs for more details.");
final long start = System.currentTimeMillis();
- pythonBridge.discoverExtensions();
+ pythonBridge.discoverExtensions(includeNarBundles);
bundleCoordinateBundleLookup.putIfAbsent(pythonBundle.getBundleDetails().getCoordinate(),
pythonBundle);
@@ -269,11 +276,6 @@ public class StandardExtensionDiscoveringManager
implements ExtensionDiscovering
.build();
}
- @Override
- public void discoverNewPythonExtensions(final Bundle pythonBundle) {
- logger.debug("Scanning to discover new Python extensions...");
- discoverPythonExtensions(pythonBundle);
- }
/**
* Loads extensions from the specified bundle.
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java
index 7b99f146f5..cf3580ca74 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/ProcessorCreationWorkflow.java
@@ -21,8 +21,21 @@ import
org.apache.nifi.python.processor.PythonProcessorAdapter;
public interface ProcessorCreationWorkflow {
+ /**
+ * @return <code>true</code> if the Processor has been packaged along with
its dependencies, <code>false</code> otherwise
+ */
+ boolean isPackagedWithDependencies();
+
+ /**
+ * Downloads any dependencies required by the Processor using
<code>pip</code>.
+ * If the Processor is already packaged with its dependencies, this method
does nothing.
+ */
void downloadDependencies();
+ /**
+ * Creates the Processor on the Python side and returns an adapter for
interacting with the Processor from the Java side.
+ * @return an adapter for interacting with the Python Processor
+ */
PythonProcessorAdapter createProcessor();
}
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java
index 2e4779bfab..0dcb82e1bc 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java
@@ -17,13 +17,13 @@
package org.apache.nifi.py4j;
-import org.apache.nifi.py4j.logging.LogLevelChangeListener;
-import org.apache.nifi.py4j.logging.PythonLogLevel;
-import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.py4j.client.JavaObjectBindings;
import org.apache.nifi.py4j.client.NiFiPythonGateway;
import org.apache.nifi.py4j.client.StandardPythonClient;
+import org.apache.nifi.py4j.logging.LogLevelChangeListener;
+import org.apache.nifi.py4j.logging.PythonLogLevel;
+import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.py4j.server.NiFiGatewayServer;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonController;
@@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory;
import py4j.CallbackClient;
import py4j.GatewayServer;
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -52,6 +50,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
public class PythonProcess {
private static final Logger logger =
LoggerFactory.getLogger(PythonProcess.class);
@@ -62,6 +62,7 @@ public class PythonProcess {
private final PythonProcessConfig processConfig;
private final ControllerServiceTypeLookup controllerServiceTypeLookup;
private final File virtualEnvHome;
+ private final boolean packagedWithDependencies;
private final String componentType;
private final String componentId;
private GatewayServer server;
@@ -78,10 +79,11 @@ public class PythonProcess {
public PythonProcess(final PythonProcessConfig processConfig, final
ControllerServiceTypeLookup controllerServiceTypeLookup, final File
virtualEnvHome,
- final String componentType, final String componentId)
{
+ final boolean packagedWithDependencies, final String
componentType, final String componentId) {
this.processConfig = processConfig;
this.controllerServiceTypeLookup = controllerServiceTypeLookup;
this.virtualEnvHome = virtualEnvHome;
+ this.packagedWithDependencies = packagedWithDependencies;
this.componentType = componentType;
this.componentId = componentId;
}
@@ -224,6 +226,10 @@ public class PythonProcess {
return Base64.getEncoder().encodeToString(bytes);
}
+ private boolean isPackagedWithDependencies() {
+ return packagedWithDependencies;
+ }
+
private Process launchPythonProcess(final int listeningPort, final String
authToken) throws IOException {
final File pythonFrameworkDirectory =
processConfig.getPythonFrameworkDirectory();
final File pythonApiDirectory = new
File(pythonFrameworkDirectory.getParentFile(), "api");
@@ -234,8 +240,21 @@ public class PythonProcess {
final List<String> commands = new ArrayList<>();
commands.add(pythonCommand);
+ if (isPackagedWithDependencies()) {
+ // If not using venv, we will not launch a separate virtual
environment, so we need to use the -S
+ // flag in order to prevent the Python process from using the
installation's site-packages. This provides
+ // proper dependency isolation to the Python process.
+ commands.add("-S");
+ }
String pythonPath = pythonApiDirectory.getAbsolutePath();
+ final String absolutePath = virtualEnvHome.getAbsolutePath();
+ pythonPath = pythonPath + File.pathSeparator + absolutePath;
+
+ if (isPackagedWithDependencies()) {
+ final File dependenciesDir = new File(new File(absolutePath),
"NAR-INF/bundled-dependencies");
+ pythonPath = pythonPath + File.pathSeparator +
dependenciesDir.getAbsolutePath();
+ }
if (processConfig.isDebugController() &&
"Controller".equals(componentId)) {
commands.add("-m");
@@ -264,7 +283,13 @@ public class PythonProcess {
return processBuilder.start();
}
+ // Visible for testing
String resolvePythonCommand() throws IOException {
+ // If pip is disabled, we will not create separate virtual
environments for each Processor and thus we will use the configured Python
command
+ if (isPackagedWithDependencies()) {
+ return processConfig.getPythonCommand();
+ }
+
final File pythonCmdFile = new File(processConfig.getPythonCommand());
final String pythonCmd = pythonCmdFile.getName();
@@ -287,6 +312,13 @@ public class PythonProcess {
private void setupEnvironment() throws IOException {
+ // Environment creation is only necessary if using PIP. Otherwise, the
Process requires no outside dependencies, other than those
+ // provided in the package and thus we can simply include those
packages in the PYTHON_PATH.
+ if (isPackagedWithDependencies()) {
+ logger.debug("Will not create Python Virtual Environment because
Python Processor packaged with dependencies");
+ return;
+ }
+
final File environmentCreationCompleteFile = new File(virtualEnvHome,
"env-creation-complete.txt");
if (environmentCreationCompleteFile.exists()) {
logger.debug("Environment has already been created for {}; will
not recreate", virtualEnvHome);
@@ -408,8 +440,17 @@ public class PythonProcess {
public PythonProcessorBridge createProcessor(final String identifier,
final String type, final String version, final String workDirPath) {
final ProcessorCreationWorkflow creationWorkflow = new
ProcessorCreationWorkflow() {
+ @Override
+ public boolean isPackagedWithDependencies() {
+ return packagedWithDependencies;
+ }
+
@Override
public void downloadDependencies() {
+ if (packagedWithDependencies) {
+ return;
+ }
+
controller.downloadDependencies(type, version, workDirPath);
}
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
index cc2a5d8489..dc9c0451b7 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
@@ -78,7 +78,7 @@ public class StandardPythonBridge implements PythonBridge {
LevelChangeListener.registerLogbackListener(logLevelChangeHandler);
final File envHome = new
File(processConfig.getPythonWorkingDirectory(), "controller");
- controllerProcess = new PythonProcess(processConfig,
serviceTypeLookup, envHome, "Controller", "Controller");
+ controllerProcess = new PythonProcess(processConfig,
serviceTypeLookup, envHome, true, "Controller", "Controller");
controllerProcess.start();
running = true;
} catch (final Exception e) {
@@ -88,11 +88,18 @@ public class StandardPythonBridge implements PythonBridge {
}
@Override
- public void discoverExtensions() {
+ public void discoverExtensions(final boolean includeNarDirectories) {
ensureStarted();
final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
.map(File::getAbsolutePath)
- .collect(Collectors.toList());
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ if (includeNarDirectories) {
+ processConfig.getNarDirectories().stream()
+ .map(File::getAbsolutePath)
+ .forEach(extensionsDirs::add);
+ }
+
final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
controllerProcess.discoverExtensions(extensionsDirs, workDirPath);
}
@@ -104,7 +111,16 @@ public class StandardPythonBridge implements PythonBridge {
final ExtensionId extensionId = extensionIdFound.orElseThrow(() -> new
IllegalArgumentException("Processor Type [%s] Version [%s] not
found".formatted(type, version)));
logger.debug("Creating Python Processor Type [{}] Version [{}]",
extensionId.type(), extensionId.version());
- final PythonProcess pythonProcess =
getProcessForNextComponent(extensionId, identifier, preferIsolatedProcess);
+ final PythonProcessorDetails processorDetails =
getProcessorTypes().stream()
+ .filter(details -> details.getProcessorType().equals(type))
+ .filter(details -> details.getProcessorVersion().equals(version))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Could not find
Processor Details for Python Processor type [%s] or version
[%s]".formatted(type, version)));
+
+ final String processorHome = processorDetails.getExtensionHome();
+ final boolean bundledWithDependencies =
processorDetails.isBundledWithDependencies();
+
+ final PythonProcess pythonProcess =
getProcessForNextComponent(extensionId, identifier, processorHome,
preferIsolatedProcess, bundledWithDependencies);
final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
final PythonProcessorBridge processorBridge =
pythonProcess.createProcessor(identifier, type, version, workDirPath);
@@ -181,7 +197,9 @@ public class StandardPythonBridge implements PythonBridge {
return count;
}
- private synchronized PythonProcess getProcessForNextComponent(final
ExtensionId extensionId, final String componentId, final boolean
preferIsolatedProcess) {
+ private synchronized PythonProcess getProcessForNextComponent(final
ExtensionId extensionId, final String componentId, final String processorHome,
final boolean preferIsolatedProcess,
+ final boolean packagedWithDependencies) {
+
final int processorsOfThisType =
processorCountByType.getOrDefault(extensionId, 0);
final int processIndex = processorsOfThisType %
processConfig.getMaxPythonProcessesPerType();
@@ -210,15 +228,28 @@ public class StandardPythonBridge implements PythonBridge
{
logger.info("In order to create Python Processor of type {},
launching a new Python Process because there are currently {} Python Processors
of this type and {} Python Processes",
extensionId.type(), processorsOfThisType,
processesByProcessorType.size());
- final File extensionsWorkDir = new
File(processConfig.getPythonWorkingDirectory(), "extensions");
- final File componentTypeHome = new File(extensionsWorkDir,
extensionId.type());
- final File envHome = new File(componentTypeHome,
extensionId.version());
- final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome, extensionId.type(),
componentId);
+ // If the processor is packaged with its dependencies as a
NAR, we can use the Processor Home as the Environment Home.
+ // Otherwise, we need to create a Virtual Environment for the
Processor.
+ final File envHome;
+ if (packagedWithDependencies) {
+ envHome = new File(processorHome);
+ } else {
+ final File extensionsWorkDir = new
File(processConfig.getPythonWorkingDirectory(), "extensions");
+ final File componentTypeHome = new File(extensionsWorkDir,
extensionId.type());
+ envHome = new File(componentTypeHome,
extensionId.version());
+ }
+
+ final PythonProcess pythonProcess = new
PythonProcess(processConfig, serviceTypeLookup, envHome,
packagedWithDependencies, extensionId.type(), componentId);
pythonProcess.start();
+ // Create list of extensions directories, including NAR
directories
final List<String> extensionsDirs =
processConfig.getPythonExtensionsDirectories().stream()
.map(File::getAbsolutePath)
- .collect(Collectors.toList());
+ .collect(Collectors.toCollection(ArrayList::new));
+ processConfig.getNarDirectories().stream()
+ .map(File::getAbsolutePath)
+ .forEach(extensionsDirs::add);
+
final String workDirPath =
processConfig.getPythonWorkingDirectory().getAbsolutePath();
pythonProcess.discoverExtensions(extensionsDirs, workDirPath);
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
index 22aa56585e..713d12addb 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
@@ -106,11 +106,18 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
long sleepMillis = 1_000L;
while (!future.isCancelled()) {
+ final boolean packagedWithDependencies =
creationWorkflow.isPackagedWithDependencies();
+ if (packagedWithDependencies) {
+ loadState = LoadState.LOADING_PROCESSOR_CODE;
+ break;
+ }
+
loadState = LoadState.DOWNLOADING_DEPENDENCIES;
try {
creationWorkflow.downloadDependencies();
logger.info("Successfully downloaded dependencies for Python
Processor {} ({})", identifier, getProcessorType());
+
break;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java
index 4a1ad32638..8797d66eba 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/test/java/org/apache/nifi/py4j/PythonProcessTest.java
@@ -16,14 +16,6 @@
*/
package org.apache.nifi.py4j;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonProcessConfig;
import org.junit.jupiter.api.BeforeEach;
@@ -34,6 +26,14 @@ import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
@ExtendWith(MockitoExtension.class)
class PythonProcessTest {
@@ -56,7 +56,14 @@ class PythonProcessTest {
@BeforeEach
public void setUp() {
- this.pythonProcess = new PythonProcess(this.pythonProcessConfig,
this.controllerServiceTypeLookup, virtualEnvHome, "Controller", "Controller");
+ this.pythonProcess = new PythonProcess(this.pythonProcessConfig,
this.controllerServiceTypeLookup, virtualEnvHome, false, "Controller",
"Controller");
+ }
+
+ @Test
+ void testUsesConfiguredValueWhenPackagedWithDependencies() throws
IOException {
+ when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
+ final PythonProcess process = new
PythonProcess(this.pythonProcessConfig, this.controllerServiceTypeLookup,
virtualEnvHome, true, "Controller", "Controller");
+ assertEquals(PYTHON_CMD, process.resolvePythonCommand());
}
@Test
@@ -100,7 +107,7 @@ class PythonProcessTest {
@Test
void testResolvePythonCommandNone() {
when(pythonProcessConfig.getPythonCommand()).thenReturn(PYTHON_CMD);
- assertThrows(IOException.class, ()->
this.pythonProcess.resolvePythonCommand());
+ assertThrows(IOException.class, () ->
this.pythonProcess.resolvePythonCommand());
}
private String getExpectedBinaryPath(String binarySubDirectoryName) {
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 7fb0399166..8c184f22a5 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -142,7 +142,7 @@ public class PythonControllerInteractionIT {
public void testGetProcessorDetails() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.py4j",
"DEBUG");
- bridge.discoverExtensions();
+ bridge.discoverExtensions(true);
final List<PythonProcessorDetails> extensionDetails =
bridge.getProcessorTypes();
final List<String> types = extensionDetails.stream()
@@ -286,7 +286,7 @@ public class PythonControllerInteractionIT {
@Test
public void testImportRequirements() {
// Discover extensions so that they can be created
- bridge.discoverExtensions();
+ bridge.discoverExtensions(true);
final PythonProcessorDetails writeNumpyVersionDetails =
bridge.getProcessorTypes().stream()
.filter(details ->
details.getProcessorType().equals("WriteNumpyVersion"))
@@ -390,7 +390,7 @@ public class PythonControllerInteractionIT {
replaceFileText(sourceFile, "Hola, Mundo", "Hello, World");
// Discover extensions so that they can be created
- bridge.discoverExtensions();
+ bridge.discoverExtensions(true);
// Ensure that we find 2 different versions of the WriteMessage
Processor.
final List<PythonProcessorDetails> processorTypes =
bridge.getProcessorTypes();
@@ -598,7 +598,7 @@ public class PythonControllerInteractionIT {
}
private TestRunner createProcessor(final String type, final String
version) {
- bridge.discoverExtensions();
+ bridge.discoverExtensions(true);
final AsyncLoadedProcessor processor =
bridge.createProcessor(createId(), type, version, true, true);
final TestRunner runner = TestRunners.newTestRunner(processor);
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
index 2071a9c916..9380143d5c 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
@@ -60,7 +60,7 @@ public class DisabledPythonBridge implements PythonBridge {
}
@Override
- public void discoverExtensions() {
+ public void discoverExtensions(final boolean includeNarDirectories) {
}
@Override
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
index a75ebca5dd..fd54691a34 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
@@ -84,8 +84,10 @@ public interface PythonBridge {
/**
* Triggers the Python Bridge to scan in order to determine which
extensions are available. The results may then be obtained by calling
* {@link #getProcessorTypes()}.
+ *
+ * @param includeNarDirectories whether or not to include NAR directories
in the search for extensions
*/
- void discoverExtensions();
+ void discoverExtensions(boolean includeNarDirectories);
/**
* Creates a Processor with the given identifier, type, and version.
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java
index 4e32b5e89d..dfdba4c317 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessConfig.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
public class PythonProcessConfig {
@@ -29,6 +30,7 @@ public class PythonProcessConfig {
private final String pythonCommand;
private final File pythonFrameworkDirectory;
private final List<File> pythonExtensionsDirectories;
+ private final List<File> narDirectories;
private final File pythonWorkingDirectory;
private final Duration commsTimeout;
private final int maxPythonProcesses;
@@ -41,6 +43,7 @@ public class PythonProcessConfig {
this.pythonCommand = builder.pythonCommand;
this.pythonFrameworkDirectory = builder.pythonFrameworkDirectory;
this.pythonExtensionsDirectories = builder.pythonExtensionsDirectories;
+ this.narDirectories = builder.narDirectories;
this.pythonWorkingDirectory = builder.pythonWorkingDirectory;
this.commsTimeout = builder.commsTimeout;
this.maxPythonProcesses = builder.maxProcesses;
@@ -62,6 +65,10 @@ public class PythonProcessConfig {
return pythonExtensionsDirectories;
}
+ public List<File> getNarDirectories() {
+ return narDirectories;
+ }
+
public File getPythonWorkingDirectory() {
return pythonWorkingDirectory;
}
@@ -93,7 +100,8 @@ public class PythonProcessConfig {
public static class Builder {
private String pythonCommand = "python3";
private File pythonFrameworkDirectory = new File("python/framework");
- private List<File> pythonExtensionsDirectories =
Collections.singletonList(new File("python/extensions"));
+ private List<File> pythonExtensionsDirectories = List.of(new
File("python/extensions"));
+ private List<File> narDirectories = Collections.emptyList();
private File pythonWorkingDirectory = new File("python");
private Duration commsTimeout = Duration.ofSeconds(0);
private int maxProcesses;
@@ -102,6 +110,7 @@ public class PythonProcessConfig {
private String debugHost = "localhost";
private int debugPort = 5678;
+
public Builder pythonCommand(final String command) {
this.pythonCommand = command;
return this;
@@ -117,6 +126,11 @@ public class PythonProcessConfig {
return this;
}
+ public Builder narDirectories(final Collection<File> narDirectories) {
+ this.narDirectories = new ArrayList<>(new
HashSet<>(narDirectories));
+ return this;
+ }
+
public Builder pythonWorkingDirectory(final File
pythonWorkingDirectory) {
this.pythonWorkingDirectory = pythonWorkingDirectory;
return this;
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java
index 9831f9ddd3..04200c7b61 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonProcessorDetails.java
@@ -39,6 +39,18 @@ public interface PythonProcessorDetails extends
PythonObjectProxy {
*/
String getSourceLocation();
+ /**
+ * @return the directory where the Processor's extension is installed. If
the extension is a module, this will be the directory
+ * containing the module. If the extension is a single file
outside of a module, this will be the directory containing
+ * that file.
+ */
+ String getExtensionHome();
+
+ /**
+ * @return <code>true</code> if the Processor is bundled with its
dependencies; <code>false</code> otherwise
+ */
+ boolean isBundledWithDependencies();
+
/**
* @return the Processor's capability description
*/
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py
index 05e0d4f6b0..b40a444d28 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py
@@ -17,7 +17,6 @@ import logging
import os
import sys
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
-
from py4j.java_gateway import JavaGateway, CallbackServerParameters,
GatewayParameters
import ExtensionManager
@@ -40,7 +39,6 @@ logger = logging.getLogger("org.apache.nifi.py4j.Controller")
class Controller:
-
def ping(self):
return "pong"
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py
index fbab6dca5d..650612dbc6 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionDetails.py
@@ -27,6 +27,8 @@ class ExtensionDetails:
tags=None,
use_cases=None,
multi_processor_use_cases=None,
+ extension_home=None,
+ dependencies_bundled=False,
property_descriptions=None):
self.type = type
@@ -35,10 +37,12 @@ class ExtensionDetails:
self.tags = tags if tags else []
self.version = version
self.source_location = source_location
+ self.extension_home = extension_home
self.description = description
self.use_cases = use_cases if use_cases else {}
self.multi_processor_use_cases = multi_processor_use_cases if
multi_processor_use_cases else {}
self.property_descriptions = property_descriptions if
property_descriptions else {}
+ self.dependencies_bundled = dependencies_bundled
def getProcessorType(self):
return self.type
@@ -49,6 +53,9 @@ class ExtensionDetails:
def getSourceLocation(self):
return self.source_location
+ def getExtensionHome(self):
+ return self.extension_home
+
def getDependencies(self):
return ArrayList(self.dependencies)
@@ -67,6 +74,9 @@ class ExtensionDetails:
def getPropertyDescriptions(self):
return ArrayList(self.property_descriptions)
+ def isBundledWithDependencies(self):
+ return self.dependencies_bundled
+
def getInterface(self):
if len(self.interfaces) == 0:
return None
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
index 2e9063dda3..3bc6d25cd9 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py
@@ -25,7 +25,6 @@ from pathlib import Path
import ProcessorInspection
-
logger = logging.getLogger("python.ExtensionManager")
@@ -129,7 +128,9 @@ class ExtensionManager:
os.remove(completion_marker_file)
# Call load_extension to ensure that we load all necessary
dependencies, in case they have changed
- self.__gather_extension_details(module_file, work_dir)
+ dependencies_bundled = details.isBundledWithDependencies()
+ extension_home = details.getExtensionHome()
+ self.__gather_extension_details(module_file, extension_home,
dependencies_bundled, work_dir)
# Reload the processor class itself
processor_class = self.__load_extension_module(module_file,
details.local_dependencies)
@@ -179,20 +180,29 @@ class ExtensionManager:
paths = []
for path in paths:
+ # If the path has a child directory named NAR-INF, we note that it
has dependencies bundled with it
+ nar_inf_dir = os.path.join(path, 'NAR-INF')
+ dependencies_bundled = os.path.exists(nar_inf_dir)
+
for finder, name, ispkg in pkgutil.iter_modules([path]):
if not require_nifi_prefix or name.startswith('nifi_'):
module_file = '<Unknown Module File>'
try:
module = finder.find_module(name)
module_file = module.path
- logger.info('Discovered extension %s' % module_file)
- self.__gather_extension_details(module_file, work_dir)
+ # Ignore any packaged dependencies
+ if 'NAR-INF/bundled-dependencies' in module_file:
+ continue
+
+ logger.debug('Discovered extension %s' % module_file)
+
+ self.__gather_extension_details(module_file, path,
dependencies_bundled, work_dir)
except Exception:
logger.error("Failed to load Python extensions from
module file {0}. This module will be ignored.".format(module_file),
exc_info=True)
- def __gather_extension_details(self, module_file, work_dir,
local_dependencies=None):
+ def __gather_extension_details(self, module_file, extension_home,
dependencies_bundled, work_dir, local_dependencies=None):
path = Path(module_file)
basename = os.path.basename(module_file)
@@ -222,12 +232,12 @@ class ExtensionManager:
continue
child_module_file = os.path.join(dir, filename)
- self.__gather_extension_details(child_module_file, work_dir,
local_dependencies=local_dependencies)
+ self.__gather_extension_details(child_module_file,
extension_home, dependencies_bundled, work_dir,
local_dependencies=local_dependencies)
- classes_and_details =
self.__get_processor_classes_and_details(module_file)
+ classes_and_details =
self.__get_processor_classes_and_details(module_file, extension_home,
dependencies_bundled)
for classname, details in classes_and_details.items():
id = ExtensionId(classname, details.version)
- logger.info(f"For {classname} found local dependencies
{local_dependencies}")
+ logger.debug(f"For {classname} found local dependencies
{local_dependencies}")
details.local_dependencies = local_dependencies
@@ -247,13 +257,13 @@ class ExtensionManager:
id = ExtensionId(extension_type, version)
return self.processor_details[id].dependencies
- def __get_processor_classes_and_details(self, module_file):
+ def __get_processor_classes_and_details(self, module_file, extension_home,
dependencies_bundled):
class_nodes =
ProcessorInspection.get_processor_class_nodes(module_file)
details_by_class = {}
for class_node in class_nodes:
- logger.info(f"Discovered Processor class {class_node.name} in
module {module_file}")
- details = ProcessorInspection.get_processor_details(class_node,
module_file)
+ logger.debug(f"Discovered Processor class {class_node.name} in
module {module_file} with home {extension_home}")
+ details = ProcessorInspection.get_processor_details(class_node,
module_file, extension_home, dependencies_bundled=dependencies_bundled)
details_by_class[class_node.name] = details
return details_by_class
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
index c01b5a3a52..cd5fedb24a 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ProcessorInspection.py
@@ -14,11 +14,12 @@
# limitations under the License.
import ast
-import ExtensionDetails
import logging
import textwrap
from nifiapi.documentation import UseCaseDetails,
MultiProcessorUseCaseDetails, ProcessorConfiguration, PropertyDescription
+import ExtensionDetails
+
PROCESSOR_INTERFACES = ['org.apache.nifi.python.processor.FlowFileTransform',
'org.apache.nifi.python.processor.RecordTransform']
logger = logging.getLogger("python.ProcessorInspection")
@@ -37,7 +38,7 @@ def get_processor_class_nodes(module_file: str) -> list:
return processor_class_nodes
-def get_processor_details(class_node, module_file):
+def get_processor_details(class_node, module_file, extension_home,
dependencies_bundled):
# Look for a 'ProcessorDetails' class
child_class_nodes = get_class_nodes(class_node)
@@ -60,6 +61,8 @@ def get_processor_details(class_node, module_file):
version=version,
dependencies=dependencies,
source_location=module_file,
+
extension_home=extension_home,
+
dependencies_bundled=dependencies_bundled,
description=description,
tags=tags,
use_cases=use_cases,
@@ -70,7 +73,9 @@ def get_processor_details(class_node, module_file):
type=class_node.name,
version='Unknown',
dependencies=[],
- source_location=module_file)
+ source_location=module_file,
+ extension_home=extension_home,
+ dependencies_bundled=dependencies_bundled)
def __get_processor_version(details_node):
@@ -80,9 +85,9 @@ def __get_processor_version(details_node):
def __get_processor_dependencies(details_node, class_name):
deps = get_assigned_value(details_node, 'dependencies', [])
if len(deps) == 0:
- logger.info("Found no external dependencies that are required for
class %s" % class_name)
+ logger.debug("Found no external dependencies that are required for
class %s" % class_name)
else:
- logger.info("Found the following external dependencies that are
required for class {0}: {1}".format(class_name, deps))
+ logger.debug("Found the following external dependencies that are
required for class {0}: {1}".format(class_name, deps))
return deps
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py
index c1075d6078..b8b9b9038b 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/test/python/framework/TestPythonProcessorAdapter.py
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
import ProcessorInspection
+import unittest
DUMMY_PROCESSOR_FILE = 'src/test/python/framework/DummyProcessor.py'
@@ -26,10 +26,11 @@ class DetectProcessorUseCase(unittest.TestCase):
class_node = class_nodes[0]
self.assertEqual(class_node.name, 'DummyProcessor')
- details = ProcessorInspection.get_processor_details(class_node,
DUMMY_PROCESSOR_FILE)
+ details = ProcessorInspection.get_processor_details(class_node,
DUMMY_PROCESSOR_FILE, '/extensions/dummy_processor', False)
self.assertIsNotNone(details)
self.assertEqual(details.description, 'Fake Processor')
self.assertEqual(details.tags, ['tag1', 'tag2'])
+ self.assertEqual(details.extension_home, '/extensions/dummy_processor')
self.assertEqual(len(details.use_cases), 2)
self.assertEqual(details.use_cases[0].description, 'First Use Case')
self.assertEqual(details.use_cases[1].description, 'Second Use Case')
diff --git a/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml
b/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml
new file mode 100644
index 0000000000..8736a90dcc
--- /dev/null
+++ b/nifi-system-tests/nifi-python-test-extensions-nar/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-system-tests</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-python-test-extensions-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <properties>
+ <bech32.version>1.2.0</bech32.version>
+
<bech32.url>https://files.pythonhosted.org/packages/b6/41/7022a226e5a6ac7091a95ba36bad057012ab7330b9894ad4e14e31d0b858/bech32-1.2.0-py3-none-any.whl</bech32.url>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.googlecode.maven-download-plugin</groupId>
+ <artifactId>download-maven-plugin</artifactId>
+ <version>1.7.1</version>
+ <executions>
+ <execution>
+ <id>download-bech32</id>
+ <goals>
+ <goal>wget</goal>
+ </goals>
+ <phase>generate-resources</phase>
+ <configuration>
+ <url>${bech32.url}</url>
+
<outputFileName>bech32-${bech32.version}.zip</outputFileName>
+ <unpack>true</unpack>
+
<outputDirectory>${project.build.directory}/classes/NAR-INF/bundled-dependencies</outputDirectory>
+
<sha256>990dc8e5a5e4feabbdf55207b5315fdd9b73db40be294a19b3752cde9e79d981</sha256>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <configuration>
+ <includeEmptyDirs>false</includeEmptyDirs>
+
<outputDirectory>${project.build.outputDirectory}</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <includes>
+ <include>*.py</include>
+ </includes>
+ </resource>
+ </resources>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py
b/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py
new file mode 100644
index 0000000000..363ecb6a6b
--- /dev/null
+++
b/nifi-system-tests/nifi-python-test-extensions-nar/src/main/resources/WriteBech32Charset.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import bech32
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+
+
+# A simple processor that demonstrates the ability to import and use a
third-party library that is not defined as a dependency
+# but that is bundled with the nar file. This processor uses the bech32
library to return the bech32.CHARSET value.
+class WriteBech32Charset(FlowFileTransform):
+ class Java:
+ implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+ class ProcessorDetails:
+ version = '0.0.1-SNAPSHOT'
+
+ def __init__(self, **kwargs):
+ pass
+
+ def transform(self, context, flowFile):
+ return FlowFileTransformResult(relationship = "success", contents =
bech32.CHARSET)
diff --git a/nifi-system-tests/nifi-system-test-suite/pom.xml
b/nifi-system-tests/nifi-system-test-suite/pom.xml
index 61a69646c1..a4a650b6b8 100644
--- a/nifi-system-tests/nifi-system-test-suite/pom.xml
+++ b/nifi-system-tests/nifi-system-test-suite/pom.xml
@@ -40,6 +40,7 @@
<configuration>
<excludes>
<exclude>PythonProcessorIT.java</exclude>
+ <exclude>PythonNarIT.java</exclude>
</excludes>
</configuration>
</plugin>
@@ -323,6 +324,12 @@
<artifactId>nifi-python-test-extensions</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-python-test-extensions-nar</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
index 26558507c2..9662e6400f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
@@ -118,6 +118,18 @@
</unpackOptions>
</dependencySet>
+ <dependencySet>
+ <scope>runtime</scope>
+ <useProjectArtifact>false</useProjectArtifact>
+ <outputDirectory>lib/python-nars</outputDirectory>
+ <directoryMode>0770</directoryMode>
+ <fileMode>0664</fileMode>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <unpack>false</unpack>
+ <includes>
+ <include>*:nifi-python-test-extensions-nar</include>
+ </includes>
+ </dependencySet>
</dependencySets>
</assembly>
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java
new file mode 100644
index 0000000000..6090bd0d07
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonNarIT.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.python;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PythonNarIT extends NiFiSystemIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createPythonicInstanceFactory();
+ }
+
+ @Override
+ protected boolean isAllowFactoryReuse() {
+ return false;
+ }
+
+ @Test
+ public void testRunProcessorWithBundledDependencies() throws IOException,
NiFiClientException, InterruptedException {
+ final File nifiHome = getNiFiInstance().getInstanceDirectory();
+ final File lib = new File(nifiHome, "lib");
+ final File pythonNars = new File(lib, "python-nars");
+ final File[] narFiles = pythonNars.listFiles((dir, name) ->
name.endsWith(".nar"));
+ assertNotNull(narFiles);
+
+ // Copy the python nar files to the lib directory so that they will be
loaded on restart.
+ for (final File narFile : narFiles) {
+ Files.copy(narFile.toPath(),
lib.toPath().resolve(narFile.getName()), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ // Delete all python extensions from the python/extensions directory
to ensure they are not loaded.
+ final File pythonExtensions = new File(nifiHome, "python/extensions");
+ final File[] extensionFiles = pythonExtensions.listFiles();
+ assertNotNull(extensionFiles);
+ for (final File extensionFile : extensionFiles) {
+ deleteRecursively(extensionFile);
+ }
+
+ // Restart NiFi
+ getNiFiInstance().stop();
+ getNiFiInstance().start(true);
+
+ // Create instance of the WriteNumpyVersionBundledDependency
processor, and connect a GenerateFlowFile to it
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ ProcessorEntity writeVersion =
getClientUtil().createPythonProcessor("WriteBech32Charset");
+ writeVersion =
getClientUtil().setAutoTerminatedRelationships(writeVersion, "failure");
+ final ConnectionEntity generateToWriteVersion =
getClientUtil().createConnection(generate, writeVersion, "success");
+
+ // Create a TerminateFlowFile processor and connect the
WriteNumpyVersionBundledDependency processor to it
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+ final ConnectionEntity writeVersionToTerminate =
getClientUtil().createConnection(writeVersion, terminate, "success");
+
+ // Wait for processor validation to complete
+ getClientUtil().waitForValidProcessor(generate.getId());
+ getClientUtil().waitForValidProcessor(writeVersion.getId());
+
+ // Run the flow
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(generateToWriteVersion.getId(), 1);
+ getClientUtil().startProcessor(writeVersion);
+ waitForQueueCount(writeVersionToTerminate.getId(), 1);
+
+ // Verify the output
+ final String contents =
getClientUtil().getFlowFileContentAsUtf8(writeVersionToTerminate.getId(), 0);
+ // Ensure that the contents written to the FlowFile are the the 32
characters used by the bech32 encoding
+ assertEquals("qpzry9x8gf2tvdw0s3jn54khce6mua7l", contents);
+ }
+
+ private void deleteRecursively(final File file) {
+ if (file.isDirectory()) {
+ final File[] children = file.listFiles();
+ assertNotNull(children);
+
+ for (final File child : children) {
+ deleteRecursively(child);
+ }
+ }
+ assertTrue(file.delete());
+ }
+}
diff --git a/nifi-system-tests/pom.xml b/nifi-system-tests/pom.xml
index 8446edce34..c8253c2e46 100644
--- a/nifi-system-tests/pom.xml
+++ b/nifi-system-tests/pom.xml
@@ -29,6 +29,7 @@
<module>nifi-system-test-extensions2-bundle</module>
<module>nifi-alternate-config-extensions-bundle</module>
<module>nifi-system-test-nar-provider-bundles</module>
+ <module>nifi-python-test-extensions-nar</module>
<module>nifi-system-test-suite</module>
<module>nifi-stateless-system-test-suite</module>
</modules>