This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 754baf0a37 NIFI-12308: Create Python Environment in background thread
instead of during Processor creation
754baf0a37 is described below
commit 754baf0a37cb156cbab08ca7884a7ccfe12b7df1
Author: Mark Payne <[email protected]>
AuthorDate: Wed Nov 1 14:02:12 2023 -0400
NIFI-12308: Create Python Environment in background thread instead of
during Processor creation
This closes #7971
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/components/AsyncLoadedProcessor.java | 6 +-
.../nifi/controller/AbstractComponentNode.java | 1 +
.../components/ClassLoaderAwarePythonBridge.java | 3 +-
.../apache/nifi/controller/ExtensionBuilder.java | 45 ++--
.../nar/StandardExtensionDiscoveringManager.java | 69 ++---
.../org/apache/nifi/py4j/StandardPythonBridge.java | 28 +-
.../nifi/py4j/StandardPythonProcessorBridge.java | 77 ++----
.../python/processor/FlowFileTransformProxy.java | 9 +-
.../python/processor/PythonProcessorProxy.java | 103 +++++++-
.../python/processor/RecordTransformProxy.java | 9 +-
.../nifi-py4j-integration-tests/pom.xml | 12 +-
.../PythonControllerInteractionIT.java | 294 +++++++--------------
.../apache/nifi/python/DisabledPythonBridge.java | 4 +-
.../java/org/apache/nifi/python/PythonBridge.java | 7 +-
.../python/processor/PythonProcessorBridge.java | 10 +-
15 files changed, 309 insertions(+), 368 deletions(-)
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
index 14f7e31cb8..68ce2953bf 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/AsyncLoadedProcessor.java
@@ -17,7 +17,9 @@
package org.apache.nifi.components;
-public interface AsyncLoadedProcessor {
+import org.apache.nifi.processor.Processor;
+
+public interface AsyncLoadedProcessor extends Processor {
default boolean isLoaded() {
return getState() == LoadState.FINISHED_LOADING;
}
@@ -25,6 +27,8 @@ public interface AsyncLoadedProcessor {
LoadState getState();
enum LoadState {
+ INITIALIZING_ENVIRONMENT,
+
DOWNLOADING_DEPENDENCIES,
LOADING_PROCESSOR_CODE,
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index c9fc336381..0b6d3bbd72 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -878,6 +878,7 @@ public abstract class AbstractComponentNode implements
ComponentNode {
if (component instanceof final AsyncLoadedProcessor
asyncLoadedProcessor) {
if (!asyncLoadedProcessor.isLoaded()) {
final String explanation = switch
(asyncLoadedProcessor.getState()) {
+ case INITIALIZING_ENVIRONMENT -> "Initializing runtime
environment for the Processor.";
case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download
one or more Processor dependencies. See logs for additional details.";
case DOWNLOADING_DEPENDENCIES -> "In the process of
downloading third-party dependencies required by the Processor.";
case LOADING_PROCESSOR_CODE -> "In the process of
loading Processor code";
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
index 06bcd4dc70..8a49feaf92 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/ClassLoaderAwarePythonBridge.java
@@ -22,7 +22,6 @@ import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
import java.io.IOException;
import java.util.List;
@@ -97,7 +96,7 @@ public class ClassLoaderAwarePythonBridge implements
PythonBridge {
}
@Override
- public PythonProcessorBridge createProcessor(final String identifier,
final String type, final String version, final boolean preferIsolatedProcess) {
+ public AsyncLoadedProcessor createProcessor(final String identifier, final
String type, final String version, final boolean preferIsolatedProcess) {
try (final NarCloseable narCloseable =
NarCloseable.withComponentNarLoader(classLoader)) {
return delegate.createProcessor(identifier, type, version,
preferIsolatedProcess);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 43ee011ea2..13c7609896 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -16,16 +16,6 @@
*/
package org.apache.nifi.controller;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Proxy;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
@@ -77,8 +67,6 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.python.PythonBridge;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@@ -94,6 +82,17 @@ import org.apache.nifi.validation.RuleViolationsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Proxy;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
public class ExtensionBuilder {
private static final Logger logger =
LoggerFactory.getLogger(ExtensionBuilder.class);
@@ -750,9 +749,9 @@ public class ExtensionBuilder {
final Processor processor = processorComponent.getComponent();
- final ProcessorInitializationContext initiContext = new
StandardProcessorInitializationContext(identifier,
processorComponent.getLogger(),
+ final ProcessorInitializationContext initContext = new
StandardProcessorInitializationContext(identifier,
processorComponent.getLogger(),
serviceProvider, nodeTypeProvider, kerberosConfig);
- processor.initialize(initiContext);
+ processor.initialize(initContext);
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
verifyControllerServiceReferences(processor,
bundle.getClassLoader());
@@ -867,24 +866,14 @@ public class ExtensionBuilder {
// TODO: This is a hack because there's a bug in the UI that causes
it to not load extensions that don't have a `.` in the type.
final String processorType = type.startsWith("python.") ?
type.substring("python.".length()) : type;
- final PythonProcessorBridge processorBridge =
pythonBridge.createProcessor(identifier, processorType,
bundleCoordinate.getVersion(), true);
- final Processor processor = processorBridge.getProcessorProxy();
+ final Processor processor =
pythonBridge.createProcessor(identifier, processorType,
bundleCoordinate.getVersion(), true);
final ComponentLog componentLog = new
SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new
TerminationAwareLogger(componentLog);
- final PythonProcessorInitializationContext initContext = new
PythonProcessorInitializationContext() {
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public ComponentLog getLogger() {
- return terminationAwareLogger;
- }
- };
- processorBridge.initialize(initContext);
+ final ProcessorInitializationContext initContext = new
StandardProcessorInitializationContext(identifier, terminationAwareLogger,
+ serviceProvider, nodeTypeProvider, kerberosConfig);
+ processor.initialize(initContext);
return new LoggableComponent<>(processor, bundleCoordinate,
terminationAwareLogger);
} finally {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index 24058fd045..bed296e3c2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -16,29 +16,6 @@
*/
package org.apache.nifi.nar;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.authentication.LoginIdentityProvider;
import org.apache.nifi.authorization.AccessPolicyProvider;
@@ -63,16 +40,12 @@ import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.init.ConfigurableComponentInitializer;
import org.apache.nifi.init.ConfigurableComponentInitializerFactory;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.nar.ExtensionDefinition.ExtensionRuntime;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingTask;
@@ -80,6 +53,30 @@ import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
/**
* Scans through the classpath to load all FlowFileProcessors,
FlowFileComparators, and ReportingTasks using the service provider API and
running through all classloaders (root, NARs).
*
@@ -711,23 +708,7 @@ public class StandardExtensionDiscoveringManager
implements ExtensionDiscovering
final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ?
classType.substring(PYTHON_TYPE_PREFIX.length()) : classType;
final String procId = "temp-component-" + type;
- final PythonProcessorBridge processorBridge =
pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(),
false);
- tempComponent = processorBridge.getProcessorProxy();
-
- final ComponentLog componentLog = new MockComponentLogger();
- final PythonProcessorInitializationContext initContext = new
PythonProcessorInitializationContext() {
- @Override
- public String getIdentifier() {
- return procId;
- }
-
- @Override
- public ComponentLog getLogger() {
- return componentLog;
- }
- };
-
- processorBridge.initialize(initContext);
+ tempComponent = pythonBridge.createProcessor(procId, type,
bundleCoordinate.getVersion(), false);
} else {
final Class<?> componentClass = Class.forName(classType, true,
bundleClassLoader);
tempComponent = (ConfigurableComponent)
componentClass.getDeclaredConstructor().newInstance();
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
index faa1afdd2d..64392717c8 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonBridge.java
@@ -17,6 +17,7 @@
package org.apache.nifi.py4j;
+import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
@@ -24,8 +25,12 @@ import
org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
+import org.apache.nifi.python.processor.FlowFileTransform;
+import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.python.processor.RecordTransform;
+import org.apache.nifi.python.processor.RecordTransformProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
public class StandardPythonBridge implements PythonBridge {
@@ -89,8 +95,7 @@ public class StandardPythonBridge implements PythonBridge {
controllerProcess.getController().discoverExtensions(extensionsDirs,
workDirPath);
}
- @Override
- public PythonProcessorBridge createProcessor(final String identifier,
final String type, final String version, final boolean preferIsolatedProcess) {
+ private PythonProcessorBridge createProcessorBridge(final String
identifier, final String type, final String version, final boolean
preferIsolatedProcess) {
ensureStarted();
logger.debug("Creating Python Processor of type {}", type);
@@ -127,6 +132,25 @@ public class StandardPythonBridge implements PythonBridge {
return processorBridge;
}
+ @Override
+ public AsyncLoadedProcessor createProcessor(final String identifier, final
String type, final String version, final boolean preferIsolatedProcess) {
+ final PythonProcessorDetails processorDetails =
getProcessorTypes().stream()
+ .filter(details -> details.getProcessorType().equals(type))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Unknown Python
Processor type: " + type));
+
+ final String implementedInterface = processorDetails.getInterface();
+ final Supplier<PythonProcessorBridge> processorBridgeFactory = () ->
createProcessorBridge(identifier, type, version, preferIsolatedProcess);
+
+ if (FlowFileTransform.class.getName().equals(implementedInterface)) {
+ return new FlowFileTransformProxy(type, processorBridgeFactory);
+ }
+ if (RecordTransform.class.getName().equals(implementedInterface)) {
+ return new RecordTransformProxy(type, processorBridgeFactory);
+ }
+ return null;
+ }
+
@Override
public synchronized void onProcessorRemoved(final String identifier, final
String type, final String version) {
final ExtensionId extensionId = new ExtensionId(type, version);
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
index 7ae9cef081..ba9d95fe01 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java
@@ -17,24 +17,16 @@
package org.apache.nifi.py4j;
-import org.apache.nifi.processor.Processor;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.FlowFileTransform;
-import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
-import org.apache.nifi.python.processor.PythonProcessorProxy;
-import org.apache.nifi.python.processor.RecordTransform;
-import org.apache.nifi.python.processor.RecordTransformProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
@@ -45,7 +37,6 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
private final ProcessorCreationWorkflow creationWorkflow;
private final PythonProcessorDetails processorDetails;
private volatile PythonProcessorAdapter adapter;
- private final PythonProcessorProxy proxy;
private final File workingDir;
private final File moduleFile;
private volatile long lastModified;
@@ -60,8 +51,6 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
this.workingDir = builder.workDir;
this.moduleFile = builder.moduleFile;
this.lastModified = this.moduleFile.lastModified();
-
- this.proxy = createProxy();
}
@Override
@@ -70,46 +59,35 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
}
@Override
- public Future<Void> initialize(final PythonProcessorInitializationContext
context) {
+ public void initialize(final PythonProcessorInitializationContext context)
{
this.initializationContext = context;
final String threadName = "Initialize Python Processor %s
(%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
- final CompletableFuture<Void> future = new CompletableFuture<>();
-
- Thread.ofVirtual().name(threadName).start(() ->
initializePythonSide(future));
- return future;
+ Thread.ofVirtual().name(threadName).start(this::initializePythonSide);
}
public LoadState getLoadState() {
return loadState;
}
- private void initializePythonSide(final CompletableFuture<Void> future) {
+ private void initializePythonSide() {
try {
- try {
- creationWorkflow.downloadDependencies();
- loadState = LoadState.LOADING_PROCESSOR_CODE;
- } catch (final Exception e) {
- loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
- throw e;
- }
-
- final PythonProcessorAdapter pythonProcessorAdapter;
- try {
- pythonProcessorAdapter = creationWorkflow.createProcessor();
- pythonProcessorAdapter.initialize(initializationContext);
- this.adapter = pythonProcessorAdapter;
- this.proxy.onPythonSideInitialized(pythonProcessorAdapter);
-
- loadState = LoadState.FINISHED_LOADING;
- } catch (final Exception e) {
- loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
- throw e;
- }
+ creationWorkflow.downloadDependencies();
+ loadState = LoadState.LOADING_PROCESSOR_CODE;
+ } catch (final Exception e) {
+ loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
+ throw e;
+ }
- future.complete(null);
- } catch (final Throwable t) {
- future.completeExceptionally(t);
+ final PythonProcessorAdapter pythonProcessorAdapter;
+ try {
+ pythonProcessorAdapter = creationWorkflow.createProcessor();
+ pythonProcessorAdapter.initialize(initializationContext);
+ this.adapter = pythonProcessorAdapter;
+ loadState = LoadState.FINISHED_LOADING;
+ } catch (final Exception e) {
+ loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
+ throw e;
}
}
@@ -118,11 +96,6 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
return processorDetails.getProcessorType();
}
- @Override
- public Processor getProcessorProxy() {
- return proxy;
- }
-
@Override
public boolean reload() {
if (moduleFile.lastModified() <= lastModified) {
@@ -131,24 +104,12 @@ public class StandardPythonProcessorBridge implements
PythonProcessorBridge {
}
controller.reloadProcessor(getProcessorType(),
processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
- initializePythonSide(new CompletableFuture<>());
+ initializePythonSide();
lastModified = moduleFile.lastModified();
return true;
}
- private PythonProcessorProxy createProxy() {
- final String implementedInterface = processorDetails.getInterface();
- if (FlowFileTransform.class.getName().equals(implementedInterface)) {
- return new FlowFileTransformProxy(this);
- }
- if (RecordTransform.class.getName().equals(implementedInterface)) {
- return new RecordTransformProxy(this);
- }
-
- throw new IllegalArgumentException("Python Processor does not
implement any of the valid interfaces. Interface implemented: " +
implementedInterface);
- }
-
public static class Builder {
private PythonController controller;
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
index d7f46cda67..0af61d738a 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileTransformProxy.java
@@ -29,21 +29,20 @@ import py4j.Py4JNetworkException;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FlowFileTransformProxy extends PythonProcessorProxy {
- private final PythonProcessorBridge bridge;
private volatile FlowFileTransform transform;
- public FlowFileTransformProxy(final PythonProcessorBridge bridge) {
- super(bridge);
- this.bridge = bridge;
+ public FlowFileTransformProxy(final String processorType, final
Supplier<PythonProcessorBridge> bridgeFactory) {
+ super(processorType, bridgeFactory);
}
-
@OnScheduled
public void setContext(final ProcessContext context) {
+ final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new
IllegalStateException(this + " is not finished initializing"));
final Optional<PythonProcessorAdapter> optionalAdapter =
bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
throw new IllegalStateException(this + " is not finished
initializing");
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
index dfcf5fd8c5..1b9383819e 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/PythonProcessorProxy.java
@@ -26,8 +26,10 @@ import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import java.util.Collection;
@@ -38,15 +40,18 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@SupportsSensitiveDynamicProperties
public abstract class PythonProcessorProxy extends AbstractProcessor
implements AsyncLoadedProcessor {
- private final PythonProcessorBridge bridge;
+ private final String processorType;
+ private volatile PythonProcessorInitializationContext initContext;
+ private volatile PythonProcessorBridge bridge;
private volatile Set<Relationship> cachedRelationships = null;
private volatile List<PropertyDescriptor> cachedPropertyDescriptors = null;
private volatile Map<String, PropertyDescriptor> cachedDynamicDescriptors
= null;
- private volatile boolean supportsDynamicProperties;
+ private volatile Boolean supportsDynamicProperties;
protected static final Relationship REL_ORIGINAL = new
Relationship.Builder()
.name("original")
@@ -61,16 +66,55 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
REL_ORIGINAL,
REL_FAILURE);
- public PythonProcessorProxy(final PythonProcessorBridge bridge) {
- this.bridge = bridge;
+ public PythonProcessorProxy(final String processorType, final
Supplier<PythonProcessorBridge> bridgeFactory) {
+ this.processorType = processorType;
+
+ Thread.ofVirtual().name("Initialize " + processorType).start(() -> {
+ this.bridge = bridgeFactory.get();
+
+ // If initialization context has already been set, initialize
bridge.
+ final PythonProcessorInitializationContext pythonInitContext =
initContext;
+ if (pythonInitContext != null) {
+ this.bridge.initialize(pythonInitContext);
+ }
+ });
}
- public void onPythonSideInitialized(final PythonProcessorAdapter adapter) {
- supportsDynamicProperties = adapter.isDynamicPropertySupported();
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+
+ final PythonProcessorInitializationContext initContext = new
PythonProcessorInitializationContext() {
+ @Override
+ public String getIdentifier() {
+ return context.getIdentifier();
+ }
+
+ @Override
+ public ComponentLog getLogger() {
+ return context.getLogger();
+ }
+ };
+
+ this.initContext = initContext;
+
+ // If Bridge has already been set, initialize it.
+ final PythonProcessorBridge bridge = this.bridge;
+ if (bridge != null) {
+ bridge.initialize(initContext);
+ }
+ }
+
+ protected Optional<PythonProcessorBridge> getBridge() {
+ return Optional.ofNullable(this.bridge);
}
@Override
public LoadState getState() {
+ if (bridge == null) {
+ return LoadState.INITIALIZING_ENVIRONMENT;
+ }
+
return bridge.getLoadState();
}
@@ -80,6 +124,10 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
return this.cachedPropertyDescriptors;
}
+ if (bridge == null) {
+ return Collections.emptyList();
+ }
+
final Optional<PythonProcessorAdapter> optionalAdapter =
bridge.getProcessorAdapter();
if (optionalAdapter.isEmpty()) {
// If we don't have the adapter yet, use whatever is cached, even
if it's old, or an empty List if we have nothing cached.
@@ -99,6 +147,14 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
@Override
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ if (bridge == null) {
+ return List.of(new ValidationResult.Builder()
+ .subject("Processor")
+ .explanation("Python environment is not yet initialized")
+ .valid(false)
+ .build());
+ }
+
final LoadState loadState = bridge.getLoadState();
if (loadState == LoadState.LOADING_PROCESSOR_CODE || loadState ==
LoadState.DOWNLOADING_DEPENDENCIES) {
return List.of(new ValidationResult.Builder()
@@ -144,6 +200,10 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
return cachedDynamicDescriptors.get(propertyDescriptorName);
}
+ if (bridge == null) {
+ return null;
+ }
+
try {
final Optional<PythonProcessorAdapter> optionalAdapter =
bridge.getProcessorAdapter();
return optionalAdapter.map(adapter ->
adapter.getSupportedDynamicPropertyDescriptor(propertyDescriptorName))
@@ -155,7 +215,18 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
}
protected boolean isSupportsDynamicPropertyDescriptor() {
- return supportsDynamicProperties;
+ if (this.supportsDynamicProperties != null) {
+ return supportsDynamicProperties;
+ }
+
+ if (bridge == null) {
+ return false;
+ }
+
+ final Optional<PythonProcessorAdapter> adapter =
bridge.getProcessorAdapter();
+ final boolean supported =
adapter.map(PythonProcessorAdapter::isDynamicPropertySupported).orElse(false);
+ supportsDynamicProperties = supported;
+ return supported;
}
@OnScheduled
@@ -193,6 +264,10 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
}
private Set<Relationship> fetchRelationshipsFromPythonProcessor() {
+ if (bridge == null) {
+ return Collections.emptySet();
+ }
+
Set<Relationship> processorRelationships;
try {
processorRelationships = bridge.getProcessorAdapter()
@@ -210,6 +285,10 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
@OnScheduled
public void onScheduled(final ProcessContext context) {
+ if (bridge == null) {
+ throw new IllegalStateException("Processor is not yet
initialized");
+ }
+
reload();
bridge.getProcessorAdapter()
.orElseThrow(() -> new IllegalStateException("Processor has not
finished initializing"))
@@ -218,6 +297,10 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
@OnStopped
public void onStopped(final ProcessContext context) {
+ if (bridge == null) {
+ throw new IllegalStateException("Processor is not yet
initialized");
+ }
+
bridge.getProcessorAdapter()
.orElseThrow(() -> new IllegalStateException("Processor has not
finished initializing"))
.onStopped(context);
@@ -225,10 +308,14 @@ public abstract class PythonProcessorProxy extends
AbstractProcessor implements
@Override
public String toString() {
- return "PythonProcessor[type=" + bridge.getProcessorType() + ", id=" +
getIdentifier() + "]";
+ return "PythonProcessor[type=" + processorType + ", id=" +
getIdentifier() + "]";
}
private void reload() {
+ if (bridge == null) {
+ return;
+ }
+
final boolean reloaded = bridge.reload();
if (reloaded) {
getLogger().info("Successfully reloaded Processor");
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
index 93f140b052..ecae852962 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java
@@ -56,10 +56,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.function.Supplier;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class RecordTransformProxy extends PythonProcessorProxy {
- private final PythonProcessorBridge bridge;
private volatile RecordTransform transform;
static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
@@ -78,9 +78,8 @@ public class RecordTransformProxy extends
PythonProcessorProxy {
.build();
- public RecordTransformProxy(final PythonProcessorBridge bridge) {
- super(bridge);
- this.bridge = bridge;
+ public RecordTransformProxy(final String processorType, final
Supplier<PythonProcessorBridge> bridgeFactory) {
+ super(processorType, bridgeFactory);
}
@Override
@@ -94,6 +93,8 @@ public class RecordTransformProxy extends
PythonProcessorProxy {
@OnScheduled
public void setProcessContext(final ProcessContext context) {
+ final PythonProcessorBridge bridge = getBridge().orElseThrow(() -> new
IllegalStateException(this + " is not finished initializing"));
+
final Optional<PythonProcessorAdapter> adapterOptional =
bridge.getProcessorAdapter();
if (adapterOptional.isEmpty()) {
throw new IllegalStateException(this + " is not finished
initializing");
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
index 188ad20cfc..2e272e6451 100644
--- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
+++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/pom.xml
@@ -82,7 +82,17 @@
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
index 5660fb277d..30b7e9e1b4 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java
@@ -17,34 +17,28 @@
package org.apache.nifi.py4j;
+import org.apache.nifi.components.AsyncLoadedProcessor;
+import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.mock.MockProcessorInitializationContext;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
-import org.apache.nifi.python.processor.EmptyAttributeMap;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
-import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
-import org.apache.nifi.python.processor.RecordTransform;
-import org.apache.nifi.python.processor.RecordTransformResult;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.TestRunner;
@@ -54,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -74,13 +67,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -166,7 +156,7 @@ public class PythonControllerInteractionIT {
final List<PythonProcessorDetails> extensionDetails =
bridge.getProcessorTypes();
final List<String> types = extensionDetails.stream()
.map(PythonProcessorDetails::getProcessorType)
- .collect(Collectors.toList());
+ .toList();
assertTrue(types.contains(PRETTY_PRINT_JSON));
assertTrue(types.contains("ConvertCsvToExcel"));
@@ -186,10 +176,7 @@ public class PythonControllerInteractionIT {
// Create a PrettyPrintJson Processor
final byte[] jsonContent =
Files.readAllBytes(Paths.get("src/test/resources/json/input/simple-person.json"));
for (int i=0; i < 3; i++) {
- final PythonProcessorBridge prettyPrintJson =
createProcessor(PRETTY_PRINT_JSON);
- assertNotNull(prettyPrintJson);
-
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(prettyPrintJson);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue(jsonContent);
@@ -203,11 +190,7 @@ public class PythonControllerInteractionIT {
@Disabled("Just for manual testing...")
public void runPrettyPrintJsonManyThreads() throws IOException {
// Create a PrettyPrintJson Processor
- final PythonProcessorBridge prettyPrintJson =
createProcessor(PRETTY_PRINT_JSON);
- assertNotNull(prettyPrintJson);
-
- // Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(prettyPrintJson);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final int flowFileCount = 100_000;
@@ -226,12 +209,8 @@ public class PythonControllerInteractionIT {
@Test
public void testSimplePrettyPrint() throws IOException {
- // Discover extensions so that they can be created
- final PythonProcessorBridge prettyPrintJson =
createProcessor(PRETTY_PRINT_JSON);
- assertNotNull(prettyPrintJson);
-
// Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(prettyPrintJson);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue(Paths.get("src/test/resources/json/input/simple-person.json"));
runner.setProperty("Indentation", "2");
@@ -250,8 +229,7 @@ public class PythonControllerInteractionIT {
@Test
public void testValidator() {
- final PythonProcessorBridge prettyPrintJson =
createProcessor(PRETTY_PRINT_JSON);
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(prettyPrintJson);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform(PRETTY_PRINT_JSON);
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.setProperty("Indentation", "-1");
@@ -273,11 +251,7 @@ public class PythonControllerInteractionIT {
@Test
public void testCsvToExcel() {
// Create a PrettyPrintJson Processor
- final PythonProcessorBridge csvToExcel =
createProcessor("ConvertCsvToExcel");
- assertNotNull(csvToExcel);
-
- // Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(csvToExcel);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("ConvertCsvToExcel");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue("name, number\nJohn Doe, 500");
@@ -289,11 +263,8 @@ public class PythonControllerInteractionIT {
@Test
public void testExpressionLanguageWithAttributes() {
- final PythonProcessorBridge writeProperty =
createProcessor("WritePropertyToFlowFile");
- assertNotNull(writeProperty);
-
// Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(writeProperty);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("WritePropertyToFlowFile");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.setProperty("Message", "Hola Mundo");
runner.enqueue("Hello World");
@@ -308,11 +279,7 @@ public class PythonControllerInteractionIT {
@Test
public void testPythonPackage() {
// Create a WriteNumber Processor
- final PythonProcessorBridge procBridge =
createProcessor("WriteNumber");
- assertNotNull(procBridge);
-
- // Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(procBridge);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("WriteNumber");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue("");
@@ -326,6 +293,14 @@ public class PythonControllerInteractionIT {
assertTrue(resultNum <= 1000);
}
+ private FlowFileTransformProxy createFlowFileTransform(final String type) {
+ final Processor processor = createProcessor(type);
+ assertNotNull(processor);
+
+ processor.initialize(new MockProcessorInitializationContext());
+ return (FlowFileTransformProxy) processor;
+ }
+
@Test
public void testImportRequirements() {
// Discover extensions so that they can be created
@@ -340,12 +315,8 @@ public class PythonControllerInteractionIT {
assertEquals(1, dependencies.size());
assertEquals("numpy==1.25.0", dependencies.get(0));
- // Create a PrettyPrintJson Processor
- final PythonProcessorBridge writeNumPyVersion =
createProcessor("WriteNumpyVersion");
- assertNotNull(writeNumPyVersion);
-
// Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(writeNumPyVersion);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("WriteNumpyVersion");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue("Hello World");
@@ -359,13 +330,9 @@ public class PythonControllerInteractionIT {
@Test
public void testControllerService() throws InitializationException {
- final PythonProcessorBridge processor =
createProcessor("LookupAddress");
- assertNotNull(processor);
-
- controllerServiceMap.put("StringLookupService",
TestLookupService.class);
-
// Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(processor);
+ controllerServiceMap.put("StringLookupService",
TestLookupService.class);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("LookupAddress");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
final StringLookupService lookupService = new
TestLookupService((Collections.singletonMap("John Doe", "123 My Street")));
runner.addControllerService("lookup", lookupService);
@@ -392,13 +359,8 @@ public class PythonControllerInteractionIT {
// Ensure that we started with "Hello, World" because if the test is
run multiple times, we may already be starting with the modified version
replaceFileText(sourceFile, replacement, originalMessage);
- // Create a PrettyPrintJson Processor
- final PythonProcessorBridge processor =
createProcessor("WriteMessage");
- processor.initialize(createInitContext());
- assertNotNull(processor);
-
// Setup
- final FlowFileTransformProxy wrapper = new
FlowFileTransformProxy(processor);
+ final FlowFileTransformProxy wrapper =
createFlowFileTransform("WriteMessage");
final TestRunner runner = TestRunners.newTestRunner(wrapper);
runner.enqueue("");
@@ -465,8 +427,7 @@ public class PythonControllerInteractionIT {
assertEquals(1, v2Count);
// Create a WriteMessage Processor, version 0.0.1-SNAPSHOT
- final PythonProcessorBridge procV1 = createProcessor("WriteMessage");
- final FlowFileTransformProxy wrapperV1 = new
FlowFileTransformProxy(procV1);
+ final FlowFileTransformProxy wrapperV1 =
createFlowFileTransform("WriteMessage");
final TestRunner runnerV1 = TestRunners.newTestRunner(wrapperV1);
runnerV1.enqueue("");
@@ -477,9 +438,8 @@ public class PythonControllerInteractionIT {
runnerV1.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello,
World");
// Create an instance of WriteMessage V2
- final PythonProcessorBridge procV2 = createProcessor("WriteMessage",
"0.0.2-SNAPSHOT");
- final FlowFileTransformProxy wrapperV2 = new
FlowFileTransformProxy(procV2);
- final TestRunner runnerV2 = TestRunners.newTestRunner(wrapperV2);
+ final Processor procV2 = createProcessor("WriteMessage",
"0.0.2-SNAPSHOT");
+ final TestRunner runnerV2 = TestRunners.newTestRunner(procV2);
runnerV2.enqueue("");
// Trigger the processor
@@ -490,40 +450,23 @@ public class PythonControllerInteractionIT {
}
@Test
- public void testRecordTransformWithDynamicProperties() {
- // Create a PrettyPrintJson Processor
- final PythonProcessorBridge processor =
createProcessor("SetRecordField");
- assertNotNull(processor);
-
- // Mock out ProcessContext to reflect that the processor should set
the 'name' field to 'Jane Doe'
- final PropertyDescriptor nameDescriptor = new
PropertyDescriptor.Builder()
- .name("name")
- .dynamic(true)
- .addValidator(Validator.VALID)
- .build();
- final PropertyDescriptor numberDescriptor = new
PropertyDescriptor.Builder()
- .name("number")
- .dynamic(true)
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
- .build();
-
- final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
- propertyMap.put(nameDescriptor, "Jane Doe");
- propertyMap.put(numberDescriptor, "8");
-
- final ProcessContext context = createContext(propertyMap);
+ public void testRecordTransformWithDynamicProperties() throws
InitializationException {
+ // Create a SetRecordField Processor
+ final TestRunner runner =
createRecordTransformRunner("SetRecordField");
+ runner.setProperty("name", "Jane Doe");
+ runner.setProperty("number", "8");
// Create a Record to transform and transform it
final String json = "[{ \"name\": \"John Doe\" }]";
- final RecordSchema schema = createSimpleRecordSchema("name");
- final RecordTransform recordTransform = (RecordTransform)
processor.getProcessorAdapter().get().getProcessor();
- recordTransform.setContext(context);
- final RecordTransformResult result =
recordTransform.transformRecord(json, schema, new EmptyAttributeMap()).get(0);
+ runner.enqueue(json);
+ runner.run();
+ runner.assertTransferCount("original", 1);
+ runner.assertTransferCount("success", 1);
// Verify the results
- assertEquals("success", result.getRelationship());
- assertNull(result.getSchema());
- assertEquals("{\"name\": \"Jane Doe\", \"number\": \"8\"}",
result.getRecordJson());
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+ out.assertContentEquals("""
+ [{"name":"Jane Doe","number":"8"}]""");
}
private ProcessContext createContext(final Map<PropertyDescriptor, String>
propertyValues) {
@@ -543,72 +486,44 @@ public class PythonControllerInteractionIT {
return context;
}
- @Test
- public void testRecordTransformWithInnerRecord() {
- // Create a PrettyPrintJson Processor
- final PythonProcessorBridge processor =
createProcessor("SetRecordField");
+ private TestRunner createRecordTransformRunner(final String type) throws
InitializationException {
+ final Processor processor = createProcessor("SetRecordField");
assertNotNull(processor);
- // Mock out ProcessContext to reflect that the processor should set
the 'name' field to 'Jane Doe'
- final PropertyDescriptor nameDescriptor = new
PropertyDescriptor.Builder()
- .name("name")
- .dynamic(true)
- .addValidator(Validator.VALID)
- .build();
-
- final Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
- propertyMap.put(nameDescriptor, "Jane Doe");
- final ProcessContext context = createContext(propertyMap);
-
- // Create a Record to transform and transform it
- final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\":
\"John Doe\" }}]";
- final RecordSchema recordSchema = createTwoLevelRecord().getSchema();
- final RecordTransform recordTransform = (RecordTransform)
processor.getProcessorAdapter().get().getProcessor();
- recordTransform.setContext(context);
- final RecordTransformResult result =
recordTransform.transformRecord(json, recordSchema, new
EmptyAttributeMap()).get(0);
+ final JsonTreeReader reader = new JsonTreeReader();
+ final JsonRecordSetWriter writer = new JsonRecordSetWriter();
- // Verify the results
- assertEquals("success", result.getRelationship());
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("reader", reader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(reader);
+ runner.enableControllerService(writer);
+ runner.setProperty("Record Reader", "reader");
+ runner.setProperty("Record Writer", "writer");
- assertEquals("{\"name\": \"Jane Doe\", \"father\": {\"name\": \"John
Doe\"}}", result.getRecordJson());
+ return runner;
}
-
@Test
- public void testLogger() throws ExecutionException, InterruptedException {
- bridge.discoverExtensions();
-
- final String procId = createId();
- // Create the Processor, but we do not use this.createProcessor()
because we want to explicitly have access to the
- // initialization context to inject in the logger that we want.
- final PythonProcessorBridge logContentsBridge =
bridge.createProcessor(procId, "LogContents", VERSION, true);
-
- final ComponentLog logger = Mockito.mock(ComponentLog.class);
- final PythonProcessorInitializationContext initContext = new
PythonProcessorInitializationContext() {
- @Override
- public String getIdentifier() {
- return procId;
- }
-
- @Override
- public ComponentLog getLogger() {
- return logger;
- }
- };
+ public void testRecordTransformWithInnerRecord() throws
InitializationException {
+ // Create a SetRecordField Processor
+ final TestRunner runner =
createRecordTransformRunner("SetRecordField");
+ runner.setProperty("name", "Jane Doe");
- logContentsBridge.initialize(initContext).get();
-
- final TestRunner runner =
TestRunners.newTestRunner(logContentsBridge.getProcessorProxy());
- runner.enqueue("Hello World");
+ // Create a Record to transform and transform it
+ final String json = "[{\"name\": \"Jake Doe\", \"father\": { \"name\":
\"John Doe\" }}]";
+ runner.enqueue(json);
runner.run();
- runner.assertTransferCount("original", 1);
+ // Verify the results
runner.assertTransferCount("success", 1);
- final ArgumentCaptor<String> argumentCaptor =
ArgumentCaptor.forClass(String.class);
- Mockito.verify(logger).info(argumentCaptor.capture());
- assertEquals("Hello World", argumentCaptor.getValue());
+ runner.assertTransferCount("original", 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship("success").get(0);
+ out.assertContentEquals("""
+ [{"name":"Jane Doe","father":{"name":"John Doe"}}]""");
}
+
private RecordSchema createSimpleRecordSchema(final String... fieldNames) {
return createSimpleRecordSchema(Arrays.asList(fieldNames));
}
@@ -623,30 +538,6 @@ public class PythonControllerInteractionIT {
return schema;
}
- private Record createSimpleRecord(final Map<String, Object> values) {
- final List<RecordField> recordFields = new ArrayList<>();
- for (final Map.Entry<String, Object> entry : values.entrySet()) {
- final DataType dataType =
DataTypeUtils.inferDataType(entry.getValue(),
RecordFieldType.STRING.getDataType());
- recordFields.add(new RecordField(entry.getKey(), dataType, true));
- }
-
- final RecordSchema schema = new SimpleRecordSchema(recordFields);
- return new MapRecord(schema, values);
- }
-
- private Record createTwoLevelRecord() {
- final Map<String, Object> innerPersonValues = new HashMap<>();
- innerPersonValues.put("name", "Jake Doe");
- final Record innerPersonRecord = createSimpleRecord(innerPersonValues);
-
- final Map<String, Object> outerPersonValues = new HashMap<>();
- outerPersonValues.put("name", "John Doe");
- outerPersonValues.put("father", innerPersonRecord);
- final Record outerPersonRecord = createSimpleRecord(outerPersonValues);
-
- return outerPersonRecord;
- }
-
public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
@@ -670,37 +561,40 @@ public class PythonControllerInteractionIT {
return UUID.randomUUID().toString();
}
- private PythonProcessorBridge createProcessor(final String type) {
+ private Processor createProcessor(final String type) {
return createProcessor(type, VERSION);
}
- private PythonProcessorBridge createProcessor(final String type, final
String version) {
+ private Processor createProcessor(final String type, final String version)
{
bridge.discoverExtensions();
- final PythonProcessorBridge processor =
bridge.createProcessor(createId(), type, version, true);
- final Future<Void> future = processor.initialize(createInitContext());
-
- try {
- future.get();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- }
+ final AsyncLoadedProcessor processor =
bridge.createProcessor(createId(), type, version, true);
- return processor;
- }
+ final ProcessorInitializationContext initContext = new
MockProcessorInitializationContext();
+ processor.initialize(initContext);
- private PythonProcessorInitializationContext createInitContext() {
- return new PythonProcessorInitializationContext() {
- @Override
- public String getIdentifier() {
- return "unit-test-id";
+ final long maxInitTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(30L);
+ while (true) {
+ final LoadState state = processor.getState();
+ if (state == LoadState.FINISHED_LOADING) {
+ break;
+ }
+ if (state == LoadState.DEPENDENCY_DOWNLOAD_FAILED || state ==
LoadState.LOADING_PROCESSOR_CODE_FAILED) {
+ throw new RuntimeException("Failed to initialize processor of
type %s version %s".formatted(type, version));
}
- @Override
- public ComponentLog getLogger() {
- return new MockComponentLogger();
+ if (System.currentTimeMillis() > maxInitTime) {
+ throw new RuntimeException("Timed out waiting for processor of
type %s version %s to initialize".formatted(type, version));
}
- };
+
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while initializing
processor of type %s version %s".formatted(type, version));
+ }
+ }
+ processor.initialize(new MockProcessorInitializationContext());
+ return processor;
}
+
}
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
index f33b62bc81..2911a84839 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/DisabledPythonBridge.java
@@ -17,7 +17,7 @@
package org.apache.nifi.python;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.components.AsyncLoadedProcessor;
import java.io.IOException;
import java.util.Collections;
@@ -64,7 +64,7 @@ public class DisabledPythonBridge implements PythonBridge {
}
@Override
- public PythonProcessorBridge createProcessor(final String identifier,
final String type, final String version, final boolean preferIsolatedProcess) {
+ public AsyncLoadedProcessor createProcessor(final String identifier, final
String type, final String version, final boolean preferIsolatedProcess) {
throw new UnsupportedOperationException("Cannot create Processor of
type " + type + " because Python extensions are disabled");
}
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
index dd15d75740..20d2c1f180 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/PythonBridge.java
@@ -17,7 +17,7 @@
package org.apache.nifi.python;
-import org.apache.nifi.python.processor.PythonProcessorBridge;
+import org.apache.nifi.components.AsyncLoadedProcessor;
import java.io.IOException;
import java.util.List;
@@ -88,8 +88,7 @@ public interface PythonBridge {
void discoverExtensions();
/**
- * Creates a Processor with the given identifier, type, and version. Then
returns a PythonProcessorBridge that provides access to all
- * necessary information and objects for interacting with this Processor
from the Java side.
+ * Creates a Processor with the given identifier, type, and version.
*
* @param identifier the Processor's identifier
* @param type the Processor's type
@@ -97,7 +96,7 @@ public interface PythonBridge {
* @param preferIsolatedProcess whether or not to prefer launching a
Python Process that is isolated for just this one instance of the Processor
* @return a PythonProcessorBridge that can be used for interacting with
the Processor
*/
- PythonProcessorBridge createProcessor(String identifier, String type,
String version, boolean preferIsolatedProcess);
+ AsyncLoadedProcessor createProcessor(String identifier, String type,
String version, boolean preferIsolatedProcess);
/**
* A notification that the Processor with the given identifier, type, and
version was removed from the flow. This triggers the bridge
diff --git
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
index 7d08bdfae3..0a5b1af2c9 100644
---
a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
+++
b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework-api/src/main/java/org/apache/nifi/python/processor/PythonProcessorBridge.java
@@ -18,10 +18,8 @@
package org.apache.nifi.python.processor;
import org.apache.nifi.components.AsyncLoadedProcessor.LoadState;
-import org.apache.nifi.processor.Processor;
import java.util.Optional;
-import java.util.concurrent.Future;
/**
* A model object that is used to bridge the gap between what is necessary for
the Framework to interact
@@ -34,12 +32,6 @@ public interface PythonProcessorBridge {
*/
Optional<PythonProcessorAdapter> getProcessorAdapter();
- /**
- * @return a proxy for the actual Processor implementation that will
trigger the appropriate method on the Python side, or an empty Optional
- * if the processor/adapter have not yet been initialized
- */
- Processor getProcessorProxy();
-
/**
* @return the name of the Processor implementation. This will not contain
a 'python.' prefix.
*/
@@ -57,7 +49,7 @@ public interface PythonProcessorBridge {
* Initializes the Processor
* @param context the initialization context
*/
- Future<Void> initialize(PythonProcessorInitializationContext context);
+ void initialize(PythonProcessorInitializationContext context);
/**
* @return the current state of the Processor loading