[
https://issues.apache.org/jira/browse/NIFI-12739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814472#comment-17814472
]
Alex Ethier commented on NIFI-12739:
------------------------------------
The nifi file
./nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/Controller.py
already has this:
{code:java}
from concurrent.futures import ThreadPoolExecutor
...
# We do not use ThreadPoolExecutor, but it must be kept here. Python introduced
a bug in 3.9 that causes Exceptions to be raised
# incorrectly in multi-threaded applications
(https://bugs.python.org/issue42647). This works around the bug.
# What is actually necessary is to import ThreadPoolExecutor.
# Unfortunately, IntelliJ often likes to cleanup the unused import. So we
assign a bogus variable just so
# that we have some reference to ThreadPoolExecutor in order to prevent the IDE
from cleaning up the import
threadpool_attrs = dir(ThreadPoolExecutor) {code}
so there may need to be similar fix but for ProcessPoolExecutor.
> Python custom processor cannot import ProcessPoolExecutor
> ---------------------------------------------------------
>
> Key: NIFI-12739
> URL: https://issues.apache.org/jira/browse/NIFI-12739
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 2.0.0-M2
> Reporter: Alex Ethier
> Priority: Major
>
> A runtime exception is thrown when trying to import ProcessPoolExecutor in a
> Python custom processor. This affects other libraries such as llama-index
> when it tries to import ProcessPoolExecutor.
> My system's full stack trace (see below for a simpler stack trace):
> {code:java}
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
> File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line
> 2466, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75,
> in createProcessor
> processorClass = self.extensionManager.getProcessorClass(processorType,
> version, work_dir)
>
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line
> 104, in getProcessorClass
> processor_class = self.__load_extension_module(module_file,
> details.local_dependencies)
>
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line
> 360, in __load_extension_module
> module_spec.loader.exec_module(module)
> File "<frozen importlib._bootstrap_external>", line 940, in exec_module
> File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
> File
> "/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py",
> line 4, in <module>
> from llama_index import GPTVectorStoreIndex, StorageContext,
> ServiceContext, Document
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py",
> line 24, in <module>
> from llama_index.indices import (
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py",
> line 4, in <module>
> from llama_index.indices.composability.graph import ComposableGraph
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py",
> line 4, in <module>
> from llama_index.indices.composability.graph import ComposableGraph
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py",
> line 7, in <module>
> from llama_index.indices.base import BaseIndex
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py",
> line 10, in <module>
> from llama_index.ingestion import run_transformations
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py",
> line 2, in <module>
> from llama_index.ingestion.pipeline import (
> File
> "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py",
> line 5, in <module>
> from concurrent.futures import ProcessPoolExecutor
> File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
> File
> "/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py",
> line 44, in __getattr__
> from .process import ProcessPoolExecutor as pe
> File
> "/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py",
> line 106, in <module>
> threading._register_atexit(_python_exit)
> File
> "/opt/homebrew/Cellar/[email protected]/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py",
> line 1527, in _register_atexit
> raise RuntimeError("can't register atexit after shutdown")
> RuntimeError: can't register atexit after shutdown
> at py4j.Protocol.getReturnValue(Protocol.java:476)
> at
> org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64)
> at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source)
> at
> org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116)
> at
> org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106)
> at
> org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67)
> at java.base/java.lang.VirtualThread.run(VirtualThread.java:309){code}
> Note the problem exists for both python 3.9 and python 3.11 and on both NiFi
> 2.0.0 release and on the main branch.
>
>
> The following is a stacktrace snippet:
>
> {code:java}
> Traceback (most recent call last):
> File "/configuration_resources/python_extensions/ImportTestProcessor.py",
> line 26, in transform
> from concurrent.futures import ProcessPoolExecutor
> File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist
> File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in
> __getattr__
> from .process import ProcessPoolExecutor as pe
> File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in
> <module>
> threading._register_atexit(_python_exit)
> File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit
> raise RuntimeError("can't register atexit after shutdown")
> RuntimeError: can't register atexit after shutdown{code}
> When the import fails sometimes it can be hard to find the python stacktrace
> in the logs and sometimes the processor will not repeat the initialization
> (so the stacktrace is reported only once).
>
> The following custom python processor can be used to generate the stacktrace
> snippet in an easily repeatable way:
> {code:java}
> from nifiapi.flowfiletransform import FlowFileTransform,
> FlowFileTransformResult
> from nifiapi.properties import PropertyDescriptor, StandardValidators,
> PropertyDependency, ExpressionLanguageScope
> ###
> # Test python imports
> ##
> class ImportTestProcessor(FlowFileTransform):
> class Java:
> implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
> class ProcessorDetails:
> version = "2.0.0-M1"
> description = """Test Imports"""
> tags = ["test"]
> def __init__(self, **kwargs):
> pass
> def transform(self, context, flowfile):
> import traceback
> stack_trace_str = ""
> try:
> from concurrent.futures import ProcessPoolExecutor
> except Exception as e:
> stack_trace_str = f"Exception:\n{traceback.format_exc()}"
> return FlowFileTransformResult(
> relationship="success", contents=stack_trace_str
> )
> return FlowFileTransformResult(
> relationship="success"
> ) {code}
> When running this processor, the flowfile output will show the stack trace in
> the flow file's content.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)