[ 
https://issues.apache.org/jira/browse/NIFI-12739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817462#comment-17817462
 ] 

ASF subversion and git services commented on NIFI-12739:
--------------------------------------------------------

Commit e03329e01f491b88f6f90e5285cb034e204d9363 in nifi's branch 
refs/heads/main from Alex Ethier
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=e03329e01f ]

NIFI-12739 - Import ProcessPoolExecutor to fix bug in python 3.9+ (#8357)

* NIFI-12739 Import ProcessPoolExecutor to fix bug in python 3.9+ that causes
Exceptions to be raised incorrectly in multi-threaded applications
(https://bugs.python.org/issue42647)

* Removed extraneous whitespace.

> Python custom processor cannot import ProcessPoolExecutor
> ---------------------------------------------------------
>
>                 Key: NIFI-12739
>                 URL: https://issues.apache.org/jira/browse/NIFI-12739
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 2.0.0-M2
>            Reporter: Alex Ethier
>            Assignee: Alex Ethier
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> A runtime exception is thrown when trying to import ProcessPoolExecutor in a 
> Python custom processor. This affects other libraries such as llama-index 
> when it tries to import ProcessPoolExecutor.
> My system's full stack trace (see below for a simpler stack trace):
> {code:java}
> py4j.Py4JException: An exception was raised by the Python Proxy. Return 
> Message: Traceback (most recent call last):
>   File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 
> 2466, in _call_proxy
>     return_value = getattr(self.pool[obj_id], method)(*params)
>                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, 
> in createProcessor
>     processorClass = self.extensionManager.getProcessorClass(processorType, 
> version, work_dir)
>                      
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 
> 104, in getProcessorClass
>     processor_class = self.__load_extension_module(module_file, 
> details.local_dependencies)
>                       
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 
> 360, in __load_extension_module
>     module_spec.loader.exec_module(module)
>   File "<frozen importlib._bootstrap_external>", line 940, in exec_module
>   File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
>   File 
> "/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py",
>  line 4, in <module>
>     from llama_index import GPTVectorStoreIndex, StorageContext, 
> ServiceContext, Document
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py",
>  line 24, in <module>
>     from llama_index.indices import (
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py",
>  line 4, in <module>
>     from llama_index.indices.composability.graph import ComposableGraph
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py",
>  line 4, in <module>
>     from llama_index.indices.composability.graph import ComposableGraph
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py",
>  line 7, in <module>
>     from llama_index.indices.base import BaseIndex
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py",
>  line 10, in <module>
>     from llama_index.ingestion import run_transformations
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py",
>  line 2, in <module>
>     from llama_index.ingestion.pipeline import (
>   File 
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py",
>  line 5, in <module>
>     from concurrent.futures import ProcessPoolExecutor
>   File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
>   File 
> "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py",
>  line 44, in __getattr__
>     from .process import ProcessPoolExecutor as pe
>   File 
> "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py",
>  line 106, in <module>
>     threading._register_atexit(_python_exit)
>   File 
> "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py",
>  line 1527, in _register_atexit
>     raise RuntimeError("can't register atexit after shutdown")
> RuntimeError: can't register atexit after shutdown
>       at py4j.Protocol.getReturnValue(Protocol.java:476)
>       at 
> org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64)
>       at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source)
>       at 
> org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116)
>       at 
> org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106)
>       at 
> org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67)
>       at java.base/java.lang.VirtualThread.run(VirtualThread.java:309){code}
> Note the problem exists for both python 3.9 and python 3.11 and on both NiFi 
> 2.0.0 release and on the main branch.
>  
>  
> The following is a stacktrace snippet:
>  
> {code:java}
> Traceback (most recent call last):
>   File "/configuration_resources/python_extensions/ImportTestProcessor.py", 
> line 26, in transform
>     from concurrent.futures import ProcessPoolExecutor
>   File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist
>   File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in 
> __getattr__
>     from .process import ProcessPoolExecutor as pe
>   File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in 
> <module>
>     threading._register_atexit(_python_exit)
>   File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit
>     raise RuntimeError("can't register atexit after shutdown")
> RuntimeError: can't register atexit after shutdown{code}
> When the import fails sometimes it can be hard to find the python stacktrace 
> in the logs and sometimes the processor will not repeat the initialization 
> (so the stacktrace is reported only once).
>  
> The following custom python processor can be used to generate the stacktrace 
> snippet in an easily repeatable way:
> {code:java}
> from nifiapi.flowfiletransform import FlowFileTransform, 
> FlowFileTransformResult
> from nifiapi.properties import PropertyDescriptor, StandardValidators, 
> PropertyDependency, ExpressionLanguageScope
> ###
> # Test python imports
> ##
> class ImportTestProcessor(FlowFileTransform):
>     class Java:
>         implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
>     class ProcessorDetails:
>         version = "2.0.0-M1"
>         description = """Test Imports"""
>         tags = ["test"]
>     def __init__(self, **kwargs):
>         pass
>     def transform(self, context, flowfile):
>         import traceback
>         stack_trace_str = ""
>         try:
>             from concurrent.futures import ProcessPoolExecutor
>         except Exception as e:
>             stack_trace_str = f"Exception:\n{traceback.format_exc()}"
>             return FlowFileTransformResult(
>                 relationship="success", contents=stack_trace_str
>             )
>         return FlowFileTransformResult(
>             relationship="success"
>         ) {code}
> When running this processor, the flowfile output will show the stack trace in 
> the flow file's content.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to