Alex Ethier created NIFI-12739:
----------------------------------
Summary: Python custom processor cannot import ProcessPoolExecutor
Key: NIFI-12739
URL: https://issues.apache.org/jira/browse/NIFI-12739
Project: Apache NiFi
Issue Type: Bug
Components: Extensions
Affects Versions: 2.0.0-M2
Reporter: Alex Ethier
A runtime exception is thrown when trying to import ProcessPoolExecutor in a
Python custom processor. This affects other libraries such as llama-index when
it tries to import ProcessPoolExecutor.
My system's full stack trace (see below for a simpler stack trace):
{code:java}
py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: Traceback (most recent call last):
File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line
2466, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, in
createProcessor
processorClass = self.extensionManager.getProcessorClass(processorType,
version, work_dir)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line
104, in getProcessorClass
processor_class = self.__load_extension_module(module_file,
details.local_dependencies)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line
360, in __load_extension_module
module_spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File
"/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py",
line 4, in <module>
from llama_index import GPTVectorStoreIndex, StorageContext,
ServiceContext, Document
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py",
line 24, in <module>
from llama_index.indices import (
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py",
line 4, in <module>
from llama_index.indices.composability.graph import ComposableGraph
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py",
line 4, in <module>
from llama_index.indices.composability.graph import ComposableGraph
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py",
line 7, in <module>
from llama_index.indices.base import BaseIndex
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py",
line 10, in <module>
from llama_index.ingestion import run_transformations
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py",
line 2, in <module>
from llama_index.ingestion.pipeline import (
File
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py",
line 5, in <module>
from concurrent.futures import ProcessPoolExecutor
File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
File
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py",
line 44, in __getattr__
from .process import ProcessPoolExecutor as pe
File
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py",
line 106, in <module>
threading._register_atexit(_python_exit)
File
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py",
line 1527, in _register_atexit
raise RuntimeError("can't register atexit after shutdown")
RuntimeError: can't register atexit after shutdown
at py4j.Protocol.getReturnValue(Protocol.java:476)
at
org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64)
at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source)
at
org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116)
at
org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106)
at
org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67)
at java.base/java.lang.VirtualThread.run(VirtualThread.java:309){code}
Note the problem exists for both python 3.9 and python 3.11 and on both NiFi
2.0.0 release and on the main branch.
The following is a stacktrace snippet:
{code:java}
Traceback (most recent call last):
File "/configuration_resources/python_extensions/ImportTestProcessor.py",
line 26, in transform
from concurrent.futures import ProcessPoolExecutor
File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist
File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in
__getattr__
from .process import ProcessPoolExecutor as pe
File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in <module>
threading._register_atexit(_python_exit)
File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit
raise RuntimeError("can't register atexit after shutdown")
RuntimeError: can't register atexit after shutdown{code}
When the import fails sometimes it can be hard to find the python stacktrace in
the logs and sometimes the processor will not repeat the initialization (so the
stacktrace is reported only once).
The following custom python processor can be used to generate the stacktrace
snippet in an easily repeatable way:
{code:java}
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators,
PropertyDependency, ExpressionLanguageScope
###
# Test python imports
##
class ImportTestProcessor(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "2.0.0-M1"
description = """Test Imports"""
tags = ["test"]
def __init__(self, **kwargs):
pass
def transform(self, context, flowfile):
import traceback
stack_trace_str = ""
try:
from concurrent.futures import ProcessPoolExecutor
except Exception as e:
stack_trace_str = f"Exception:\n{traceback.format_exc()}"
return FlowFileTransformResult(
relationship="success", contents=stack_trace_str
)
return FlowFileTransformResult(
relationship="success"
) {code}
When running this processor, the flowfile output will show the stack trace in
the flow file's content.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)