bydeath opened a new issue, #306: URL: https://github.com/apache/flink-agents/issues/306
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description ### Summary The official Flink Agents example, `workflow_single_agent_example.py`, fails in YARN Application Cluster mode with a `ModuleNotFoundError: No module named 'encodings'`, indicating an issue with the embedded Python interpreter's ability to find its standard library. **Crucially:** 1. **The job runs successfully in Standalone Flink Cluster mode** using the exact same Python virtual environment and code. 2. A standard PyFlink DataStream example (`word_count.py`) also runs successfully in YARN mode. 3. The **YARN NodeManager machines DO NOT have a system-wide Python installation**. The entire Python interpreter, necessary libraries, and dependencies are all contained within the archived virtual environment (`venv.tar.gz`). This confirms that the environment setup is entirely dependent on the archived file, and the failure occurs when the Pemja-embedded interpreter attempts to load this self-contained environment. ### Error Log Snippet (taskmanager.log) ```log 2025-11-02 07:46:21,055 INFO org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn Data client connected. 2025-11-02 07:46:21,053 WARN org.apache.flink.runtime.taskmanager.Task [] - action-execute-operator -> Map, Map -> Sink: Print to Std. Out (1/1)#0 (0b196afe82a3bfbc08fa3a8d12240d81_90bea66de1c231edf33913ecd54406c1_0_0) switched from INITIALIZING to FAILED with failure cause: java.lang.RuntimeException: Failed to find libpython at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:161) ~[flink-python-1.20.3.jar:1.20.3] at pemja.utils.CommonUtils.loadPython(CommonUtils.java:44) ~[flink-python-1.20.3.jar:1.20.3] at pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:365) ~[flink-python-1.20.3.jar:1.20.3] at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:144) ~[flink-python-1.20.3.jar:1.20.3] at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45) ~[flink-python-1.20.3.jar:1.20.3] at org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment.getInterpreter(EmbeddedPythonEnvironment.java:45) ~[flink-agents-dist-0.1.0.jar:0.1.0] at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:80) ~[flink-agents-dist-0.1.0.jar:0.1.0] at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.initPythonActionExecutor(ActionExecutionOperator.java:504) ~[flink-agents-dist-0.1.0.jar:0.1.0] at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.open(ActionExecutionOperator.java:247) ~[flink-agents-dist-0.1.0.jar:0.1.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.3.jar:1.20.3] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.3.jar:1.20.3] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: java.io.IOException: Failed to execute the command: /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python -c from find_libpython import find_libpython;print(find_libpython()) output: Python path configuration: PYTHONHOME = 'venv.tar.gz' PYTHONPATH = (not set) program name = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python' isolated = 0 environment = 1 user site = 1 import site = 1 sys._base_executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python' sys.base_prefix = 'venv.tar.gz' sys.base_exec_prefix = 'venv.tar.gz' sys.platlibdir = 'lib' sys.executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python' sys.prefix = 'venv.tar.gz' sys.exec_prefix = 'venv.tar.gz' sys.path = [ 'venv.tar.gz/lib/python310.zip', 'venv.tar.gz/lib/python3.10', 'venv.tar.gz/lib/python3.10/lib-dynload', ] Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding Python runtime state: core initialized ModuleNotFoundError: No module named 'encodings' Current thread 0x00007d1077b7f740 (most recent call first): <no Python frame> at pemja.utils.CommonUtils.execute(CommonUtils.java:180) ~[flink-python-1.20.3.jar:1.20.3] at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:157) ~[flink-python-1.20.3.jar:1.20.3] ... 19 more ``` * Behavior When Not Setting PYTHONHOME If the parameter -Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz is removed, the Task Manager fails with the exact same Python core error (printed to taskmanager.err), only without the detailed Java stack trace This confirms the core issue lies in the initialization of the embedded Python interpreter from the archived environment, regardless of whether PYTHONHOME is manually specified. * Conclusion The Pemja-embedded Python interpreter (used by Flink Agents) is failing to correctly locate the standard library (encodings module) of the self-contained, Conda-created virtual environment when deployed via YARN, even though the standard PyFlink Python Worker process runs successfully. This suggests a path resolution failure specific to how Pemja initializes the embedded environment on YARN. ### How to reproduce ```bash ./flink-1.20.3/bin/flink run-application -t yarn-application \ -Dcontainerized.master.env.JAVA_HOME=/usr/lib/jvm/jre-11 \ -Dcontainerized.taskmanager.env.JAVA_HOME=/usr/lib/jvm/jre-11 \ -Djobmanager.memory.process.size=1024m \ -Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name=flink-agents-workflow \ -Dyarn.ship-files=./shipfiles \ -pyarch shipfiles/venv.tar.gz \ -pyclientexec venv.tar.gz/bin/python \ -pyexec venv.tar.gz/bin/python \ -pyfs shipfiles \ -pym workflow_single_agent_example ``` ### Version and environment * **Flink Version:** 1.20.3 * **Flink Agents Version:** 0.1.0 * **Deployment Mode:** YARN Application Cluster (`-t yarn-application`) * **Python Version (in venv):** Python 3.10 * **Python Virtual Environment:** Created and archived using **Conda** (`venv.tar.gz`) * **YARN Setup:** NodeManagers lack a system Python; environment is self-contained in the archive. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
