josix commented on issue #43354:
URL: https://github.com/apache/airflow/issues/43354#issuecomment-2437786227

   I've investigated the root cause of the issue. The problem occurs in the 
rendered Python script (as shown below) that executes in the virtual 
environment [through jinja 
templating](https://github.com/apache/airflow/blob/9e8fb40ba71d9bd07ae40503d5e7aa93ef042bb0/airflow/utils/python_virtualenv_script.jinja2).
 Specifically, in the Script section (where DAG authors define their code), 
we're using [`inspect.getsource(obj)` 
](https://github.com/apache/airflow/blob/9e8fb40ba71d9bd07ae40503d5e7aa93ef042bb0/airflow/operators/python.py#L510)
 to generate the code. However, this method is also capturing the task 
decorator along with the callable function, which is causing the bug.
   
   ```python
   from __future__ import annotations
   
   import pickle
   import sys
   
   
    
   if sys.version_info >= (3,6):
       try:
           from airflow.plugins_manager import integrate_macros_plugins
           integrate_macros_plugins()
       except ImportError:
           
           pass
   
   
   # Script
   @task.skip_if(lambda x: False)
   def potentially_skipped_task():
       import requests
       import time
   
       print("This task should be skipped, but it might run anyway!")
       response = requests.get("https://example.com";)
       print(f"Response status code: {response.status_code}")
   
   
   # monkey patching for the cases when python_callable is part of the dag 
module.
   
   
   import types
   
   unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354  = 
types.ModuleType("unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354")
   
   
unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354.potentially_skipped_task
 = potentially_skipped_task
   
   sys.modules["unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354"] 
= unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354
   
   
   
   
   arg_dict = {"args": [], "kwargs": {}}
   
   
   # Read string args
   with open(sys.argv[3], "r") as file:
       virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
   
   
   
   try:
       res = potentially_skipped_task(*arg_dict["args"], **arg_dict["kwargs"])
   except Exception as e:
       with open(sys.argv[4], "w") as file:
           file.write(str(e))
       raise
   
   # Write output
   with open(sys.argv[2], "wb") as file:
       if res is not None:
           pickle.dump(res, file)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to