phi-friday commented on code in PR #41039:
URL: https://github.com/apache/airflow/pull/41039#discussion_r1709166971


##########
airflow/utils/python_virtualenv_script.jinja2:
##########
@@ -64,6 +64,20 @@ with open(sys.argv[3], "r") as file:
     virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
 {% endif %}
 
+{% if use_airflow_context | default(false) -%}
+if len(sys.argv) > 5:
+    import json
+    from types import ModuleType
+
+    class _MockPython(ModuleType):
+        @staticmethod
+        def get_current_context():
+            with open(sys.argv[5]) as file:
+                return json.load(file)

Review Comment:
   When `BaseSerialization` serializes the `TaskInstance`,
   it seems to be treating it as a string that looks like this
   
   <details>
   <summary>log</summary>
   
   ```log
   8a0a679652d1
   *** Found local files:
   ***   * 
/root/airflow/logs/dag_id=context_sample/run_id=manual__2024-08-08T10:23:57.061809+00:00/task_id=print_context/attempt=1.log
   [2024-08-08, 10:23:57 UTC] {local_task_job_runner.py:123} ▶ Pre task 
execution logs
   [2024-08-08, 10:23:57 UTC] {process_utils.py:183} INFO - Executing cmd: 
/usr/local/bin/python -m virtualenv /tmp/venvu655ft58 --system-site-packages 
--python=python
   [2024-08-08, 10:23:57 UTC] {process_utils.py:187} INFO - Output:
   [2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - created virtual 
environment CPython3.8.19.final.0-64 in 257ms
   [2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO -   creator 
CPython3Posix(dest=/tmp/venvu655ft58, clear=False, no_vcs_ignore=False, 
global=True)
   [2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO -   seeder 
FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, 
via=copy, app_data_dir=/root/.local/share/virtualenv)
   [2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO -     added seed 
packages: pip==24.1, setuptools==70.1.0, wheel==0.43.0
   [2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO -   activators 
BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
   [2024-08-08, 10:23:58 UTC] {process_utils.py:183} INFO - Executing cmd: 
/tmp/venvu655ft58/bin/pip install -r /tmp/venvu655ft58/requirements.txt
   [2024-08-08, 10:23:58 UTC] {process_utils.py:187} INFO - Output:
   [2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO - 
   [2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO - [notice] A new 
release of pip is available: 24.1 -> 24.2
   [2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO - [notice] To update, 
run: python -m pip install --upgrade pip
   [2024-08-08, 10:24:02 UTC] {python.py:576} INFO - serializable_context: 
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.TASK_CONTEXT: 'task_context'>,
    <Encoding.VAR: '__var'>: {'conf': '<***.configuration.AirflowConfigParser '
                                      'object at 0xe4886fce59d0>',
                              'conn': 'None',
                              'dag': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DAG: 'dag'>,
                                      <Encoding.VAR: '__var'>: {'_dag_id': 
'context_sample',
                                                                
'_default_view': 'grid',
                                                                
'_processor_dags_folder': '/files/dags',
                                                                '_task_group': 
{'_group_id': None,
                                                                                
'children': {'print_context': (<DagAttributeTypes.OP: 'operator'>,
                                                                                
                               'print_context')},
                                                                                
'downstream_group_ids': [],
                                                                                
'downstream_task_ids': [],
                                                                                
'prefix_group_id': True,
                                                                                
'tooltip': '',
                                                                                
'ui_color': 'CornflowerBlue',
                                                                                
'ui_fgcolor': '#000',
                                                                                
'upstream_group_ids': [],
                                                                                
'upstream_task_ids': []},
                                                                'catchup': 
False,
                                                                
'dag_dependencies': [],
                                                                'edge_info': {},
                                                                'fileloc': 
'/files/dags/context_sample.py',
                                                                'params': [],
                                                                
'schedule_interval': None,
                                                                'start_date': 
1640995200.0,
                                                                'tasks': 
[{<Encoding.TYPE: '__type'>: <DagAttributeTypes.OP: 'operator'>,
                                                                           
<Encoding.VAR: '__var'>: {'_is_empty': False,
                                                                                
                     '_log_config_logger_name': '***.task.operators',
                                                                                
                     '_needs_expansion': False,
                                                                                
                     '_operator_name': '@task.virtualenv',
                                                                                
                     '_task_module': '***.decorators.python_virtualenv',
                                                                                
                     '_task_type': '_PythonVirtualenvDecoratedOperator',
                                                                                
                     'downstream_task_ids': [],
                                                                                
                     'is_setup': False,
                                                                                
                     'is_teardown': False,
                                                                                
                     'on_failure_fail_dagrun': False,
                                                                                
                     'op_args': (),
                                                                                
                     'op_kwargs': {},
                                                                                
                     'pool': 'default_pool',
                                                                                
                     'requirements': [],
                                                                                
                     'start_from_trigger': False,
                                                                                
                     'start_trigger_args': None,
                                                                                
                     'task_id': 'print_context',
                                                                                
                     'template_ext': ['.txt'],
                                                                                
                     'template_fields': ['index_urls',
                                                                                
                                         'requirements',
                                                                                
                                         'op_kwargs',
                                                                                
                                         'templates_dict',
                                                                                
                                         'op_args',
                                                                                
                                         'venv_cache_path'],
                                                                                
                     'template_fields_renderers': {'op_args': 'py',
                                                                                
                                                   'op_kwargs': 'py',
                                                                                
                                                   'templates_dict': 'json'},
                                                                                
                     'ui_color': '#ffefeb',
                                                                                
                     'ui_fgcolor': '#000',
                                                                                
                     'weight_rule': 'downstream'}}],
                                                                'timezone': 
'UTC'}},
                              'dag_run': '<DagRun context_sample @ 2024-08-08 '
                                         '10:23:57.061809+00:00: '
                                         
'manual__2024-08-08T10:23:57.061809+00:00, '
                                         'state:running, queued_at: 2024-08-08 '
                                         '10:23:57.075396+00:00. externally '
                                         'triggered: True>',
                              'data_interval_end': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DATETIME: 'datetime'>,
                                                    <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'data_interval_start': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                      <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'ds': '2024-08-08',
                              'ds_nodash': '20240808',
                              'execution_date': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DATETIME: 'datetime'>,
                                                 <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'expanded_ti_count': None,
                              'inlet_events': 'InletEventsAccessors(_inlets=[], 
'
                                              '_datasets={}, 
_dataset_aliases={}, '
                                              
'_session=<sqlalchemy.orm.session.Session '
                                              'object at 0xe48866870a30>)',
                              'inlets': [],
                              'logical_date': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DATETIME: 'datetime'>,
                                               <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'macros': "<module '***.macros' from "
                                        "'/opt/***/***/macros/__init__.py'>",
                              'map_index_template': None,
                              'next_ds': '2024-08-08',
                              'next_ds_nodash': '20240808',
                              'next_execution_date': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                      <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'outlet_events': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DATASET_EVENT_ACCESSORS: 'dataset_event_accessors'>,
                                                <Encoding.VAR: '__var'>: 
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>,
                                                                          
<Encoding.VAR: '__var'>: {}}},
                              'outlets': [],
                              'params': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>,
                                         <Encoding.VAR: '__var'>: {}},
                              'prev_data_interval_end_success': 
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                                 <Encoding.VAR: 
'__var'>: 1723111433.152088},
                              'prev_data_interval_start_success': 
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                                   
<Encoding.VAR: '__var'>: 1723111433.152088},
                              'prev_ds': '2024-08-08',
                              'prev_ds_nodash': '20240808',
                              'prev_end_date_success': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                        <Encoding.VAR: 
'__var'>: 1723111440.429138},
                              'prev_execution_date': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                      <Encoding.VAR: '__var'>: 
1723112637.061809},
                              'prev_execution_date_success': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                              <Encoding.VAR: 
'__var'>: 1723111433.152088},
                              'prev_start_date_success': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
                                                          <Encoding.VAR: 
'__var'>: 1723111433.284149},
                              'run_id': 
'manual__2024-08-08T10:23:57.061809+00:00',
                              'task': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.OP: 'operator'>,
                                       <Encoding.VAR: '__var'>: {'_is_empty': 
False,
                                                                 
'_log_config_logger_name': '***.task.operators',
                                                                 
'_needs_expansion': False,
                                                                 
'_operator_name': '@task.virtualenv',
                                                                 
'_task_module': '***.decorators.python_virtualenv',
                                                                 '_task_type': 
'_PythonVirtualenvDecoratedOperator',
                                                                 
'downstream_task_ids': [],
                                                                 'is_setup': 
False,
                                                                 'is_teardown': 
False,
                                                                 
'on_failure_fail_dagrun': False,
                                                                 'op_args': (),
                                                                 'op_kwargs': 
{},
                                                                 'pool': 
'default_pool',
                                                                 
'requirements': [],
                                                                 
'start_from_trigger': False,
                                                                 
'start_trigger_args': None,
                                                                 'task_id': 
'print_context',
                                                                 
'template_ext': ['.txt'],
                                                                 
'template_fields': ['index_urls',
                                                                                
     'requirements',
                                                                                
     'op_kwargs',
                                                                                
     'templates_dict',
                                                                                
     'op_args',
                                                                                
     'venv_cache_path'],
                                                                 
'template_fields_renderers': {'op_args': 'py',
                                                                                
               'op_kwargs': 'py',
                                                                                
               'templates_dict': 'json'},
                                                                 'ui_color': 
'#ffefeb',
                                                                 'ui_fgcolor': 
'#000',
                                                                 'weight_rule': 
'downstream'}},
                              'task_instance': '<TaskInstance: '
                                               'context_sample.print_context '
                                               
'manual__2024-08-08T10:23:57.061809+00:00 '
                                               '[running]>',
                              'task_instance_key_str': 
'context_sample__print_context__20240808',
                              'test_mode': False,
                              'ti': '<TaskInstance: 
context_sample.print_context '
                                    'manual__2024-08-08T10:23:57.061809+00:00 '
                                    '[running]>',
                              'tomorrow_ds': '2024-08-09',
                              'tomorrow_ds_nodash': '20240809',
                              'triggering_dataset_events': {<Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DICT: 'dict'>,
                                                            <Encoding.VAR: 
'__var'>: {}},
                              'ts': '2024-08-08T10:23:57.061809+00:00',
                              'ts_nodash': '20240808T102357',
                              'ts_nodash_with_tz': 
'20240808T102357.061809+0000',
                              'var': {<Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>,
                                      <Encoding.VAR: '__var'>: {'json': 'None',
                                                                'value': 
'None'}},
                              'yesterday_ds': '2024-08-07',
                              'yesterday_ds_nodash': '20240807'}}
   [2024-08-08, 10:24:02 UTC] {process_utils.py:183} INFO - Executing cmd: 
/tmp/venvu655ft58/bin/python /tmp/venv-callczu169m3/script.py 
/tmp/venv-callczu169m3/script.in /tmp/venv-callczu169m3/script.out 
/tmp/venv-callczu169m3/string_args.txt /tmp/venv-callczu169m3/termination.log 
/tmp/venv-callczu169m3/***_context.b
   [2024-08-08, 10:24:02 UTC] {process_utils.py:187} INFO - Output:
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - 
[2024-08-08T10:24:05.868+0000] {serialized_objects.py:804} INFO - 
task_instance: <TaskInstance: context_sample.print_context 
manual__2024-08-08T10:23:57.061809+00:00 [running]>, type: <class 'str'>
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - Traceback (most 
recent call last):
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -   File 
"/tmp/venv-callczu169m3/script.py", line 73, in <module>
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -     res = 
print_context(*arg_dict["args"], **arg_dict["kwargs"])
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -   File 
"/tmp/venv-callczu169m3/script.py", line 23, in print_context
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -     context = 
get_current_context()
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -   File 
"/tmp/venv-callczu169m3/script.py", line 62, in get_current_context
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -     return 
BaseSerialization.deserialize(context, use_pydantic_models=True)
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -   File 
"/opt/***/***/serialization/serialized_objects.py", line 805, in deserialize
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -     d["task"] = 
d["task_instance"].task  # todo: add `_encode` of Operator so we don't need this
   [2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - AttributeError: 
'str' object has no attribute 'task'
   [2024-08-08, 10:24:06 UTC] {taskinstance.py:3285} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 767, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 733, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 252, in run
       return self.func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/decorators/base.py", line 266, in execute
       return_value = super().execute(context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 504, in execute
       return super().execute(context=serializable_context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 239, in execute
       return_value = self.execute_callable()
     File "/opt/airflow/airflow/operators/python.py", line 878, in 
execute_callable
       result = self._execute_python_callable_in_subprocess(python_path)
     File "/opt/airflow/airflow/operators/python.py", line 600, in 
_execute_python_callable_in_subprocess
       raise AirflowException(error_msg) from None
   airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
   'str' object has no attribute 'task'
   [2024-08-08, 10:24:06 UTC] {taskinstance.py:1225} INFO - Marking task as 
FAILED. dag_id=context_sample, task_id=print_context, 
run_id=manual__2024-08-08T10:23:57.061809+00:00, 
execution_date=20240808T102357, start_date=20240808T102357, 
end_date=20240808T102406
   [2024-08-08, 10:24:06 UTC] {taskinstance.py:340} ▼ Post task execution logs
   [2024-08-08, 10:24:06 UTC] {standard_task_runner.py:124} ERROR - Failed to 
execute job 62 for task print_context (Process returned non-zero exit status 1.
   'str' object has no attribute 'task'; 358)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
117, in _start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 483, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 256, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 341, in 
_run_raw_task
       return ti._run_raw_task(
     File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2979, in 
_run_raw_task
       return _run_raw_task(
     File "/opt/airflow/airflow/models/taskinstance.py", line 273, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 3133, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 3157, in 
_execute_task
       return _execute_task(self, context, task_orig)
     File "/opt/airflow/airflow/models/taskinstance.py", line 767, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 733, in 
_execute_callable
       return ExecutionCallableRunner(
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 252, in run
       return self.func(*args, **kwargs)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/decorators/base.py", line 266, in execute
       return_value = super().execute(context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 504, in execute
       return super().execute(context=serializable_context)
     File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/python.py", line 239, in execute
       return_value = self.execute_callable()
     File "/opt/airflow/airflow/operators/python.py", line 878, in 
execute_callable
       result = self._execute_python_callable_in_subprocess(python_path)
     File "/opt/airflow/airflow/operators/python.py", line 600, in 
_execute_python_callable_in_subprocess
       raise AirflowException(error_msg) from None
   airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
   'str' object has no attribute 'task'
   [2024-08-08, 10:24:06 UTC] {local_task_job_runner.py:261} INFO - Task exited 
with return code 1
   [2024-08-08, 10:24:06 UTC] {taskinstance.py:3875} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2024-08-08, 10:24:06 UTC] {local_task_job_runner.py:240} ▲▲▲ Log group end
   ```
   
   </details>
   
   This causes the deserialization to fail in the following code
   
https://github.com/apache/airflow/blob/ec0e9f28eafe7484887b21ded0c7a78bfc590ce0/airflow/serialization/serialized_objects.py#L804



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to