Alex Ethier created NIFI-12739:
----------------------------------

             Summary: Python custom processor cannot import ProcessPoolExecutor
                 Key: NIFI-12739
                 URL: https://issues.apache.org/jira/browse/NIFI-12739
             Project: Apache NiFi
          Issue Type: Bug
          Components: Extensions
    Affects Versions: 2.0.0-M2
            Reporter: Alex Ethier


A runtime exception is thrown when trying to import ProcessPoolExecutor in a 
Python custom processor. This affects other libraries such as llama-index when 
it tries to import ProcessPoolExecutor.

My system's full stack trace (see below for a simpler stack trace):
{code:java}
py4j.Py4JException: An exception was raised by the Python Proxy. Return 
Message: Traceback (most recent call last):
  File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 
2466, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, in 
createProcessor
    processorClass = self.extensionManager.getProcessorClass(processorType, 
version, work_dir)
                     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 
104, in getProcessorClass
    processor_class = self.__load_extension_module(module_file, 
details.local_dependencies)
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 
360, in __load_extension_module
    module_spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File 
"/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py",
 line 4, in <module>
    from llama_index import GPTVectorStoreIndex, StorageContext, 
ServiceContext, Document
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py",
 line 24, in <module>
    from llama_index.indices import (
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py",
 line 4, in <module>
    from llama_index.indices.composability.graph import ComposableGraph
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py",
 line 4, in <module>
    from llama_index.indices.composability.graph import ComposableGraph
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py",
 line 7, in <module>
    from llama_index.indices.base import BaseIndex
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py",
 line 10, in <module>
    from llama_index.ingestion import run_transformations
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py",
 line 2, in <module>
    from llama_index.ingestion.pipeline import (
  File 
"/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py",
 line 5, in <module>
    from concurrent.futures import ProcessPoolExecutor
  File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
  File 
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py",
 line 44, in __getattr__
    from .process import ProcessPoolExecutor as pe
  File 
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py",
 line 106, in <module>
    threading._register_atexit(_python_exit)
  File 
"/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py",
 line 1527, in _register_atexit
    raise RuntimeError("can't register atexit after shutdown")
RuntimeError: can't register atexit after shutdown

        at py4j.Protocol.getReturnValue(Protocol.java:476)
        at 
org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64)
        at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source)
        at 
org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116)
        at 
org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106)
        at 
org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67)
        at java.base/java.lang.VirtualThread.run(VirtualThread.java:309){code}
Note the problem exists for both python 3.9 and python 3.11 and on both NiFi 
2.0.0 release and on the main branch.

 

 

The following is a stacktrace snippet:

 
{code:java}
Traceback (most recent call last):
  File "/configuration_resources/python_extensions/ImportTestProcessor.py", 
line 26, in transform
    from concurrent.futures import ProcessPoolExecutor
  File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist
  File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in 
__getattr__
    from .process import ProcessPoolExecutor as pe
  File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in <module>
    threading._register_atexit(_python_exit)
  File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit
    raise RuntimeError("can't register atexit after shutdown")
RuntimeError: can't register atexit after shutdown{code}
When the import fails sometimes it can be hard to find the python stacktrace in 
the logs and sometimes the processor will not repeat the initialization (so the 
stacktrace is reported only once).

 

The following custom python processor can be used to generate the stacktrace 
snippet in an easily repeatable way:
{code:java}
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, 
PropertyDependency, ExpressionLanguageScope


###
# Test python imports
##
class ImportTestProcessor(FlowFileTransform):
    class Java:
        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]


    class ProcessorDetails:
        version = "2.0.0-M1"
        description = """Test Imports"""
        tags = ["test"]


    def __init__(self, **kwargs):
        pass


    def transform(self, context, flowfile):


        import traceback
        stack_trace_str = ""
        try:
            from concurrent.futures import ProcessPoolExecutor
        except Exception as e:
            stack_trace_str = f"Exception:\n{traceback.format_exc()}"
            return FlowFileTransformResult(
                relationship="success", contents=stack_trace_str
            )


        return FlowFileTransformResult(
            relationship="success"
        ) {code}
When running this processor, the flowfile output will show the stack trace in 
the flow file's content.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to