josix commented on issue #43354: URL: https://github.com/apache/airflow/issues/43354#issuecomment-2437786227
I've investigated the root cause of the issue. The problem occurs in the rendered Python script (as shown below) that executes in the virtual environment [through jinja templating](https://github.com/apache/airflow/blob/9e8fb40ba71d9bd07ae40503d5e7aa93ef042bb0/airflow/utils/python_virtualenv_script.jinja2). Specifically, in the Script section (where DAG authors define their code), we're using [`inspect.getsource(obj)` ](https://github.com/apache/airflow/blob/9e8fb40ba71d9bd07ae40503d5e7aa93ef042bb0/airflow/operators/python.py#L510) to generate the code. However, this method is also capturing the task decorator along with the callable function, which is causing the bug. ```python from __future__ import annotations import pickle import sys if sys.version_info >= (3,6): try: from airflow.plugins_manager import integrate_macros_plugins integrate_macros_plugins() except ImportError: pass # Script @task.skip_if(lambda x: False) def potentially_skipped_task(): import requests import time print("This task should be skipped, but it might run anyway!") response = requests.get("https://example.com") print(f"Response status code: {response.status_code}") # monkey patching for the cases when python_callable is part of the dag module. import types unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354 = types.ModuleType("unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354") unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354.potentially_skipped_task = potentially_skipped_task sys.modules["unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354"] = unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354 arg_dict = {"args": [], "kwargs": {}} # Read string args with open(sys.argv[3], "r") as file: virtualenv_string_args = list(map(lambda x: x.strip(), list(file))) try: res = potentially_skipped_task(*arg_dict["args"], **arg_dict["kwargs"]) except Exception as e: with open(sys.argv[4], "w") as file: file.write(str(e)) raise # Write output with open(sys.argv[2], "wb") as file: if res is not None: pickle.dump(res, file) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
