phi-friday commented on code in PR #41039:
URL: https://github.com/apache/airflow/pull/41039#discussion_r1709166971
##########
airflow/utils/python_virtualenv_script.jinja2:
##########
@@ -64,6 +64,20 @@ with open(sys.argv[3], "r") as file:
virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
{% endif %}
+{% if use_airflow_context | default(false) -%}
+if len(sys.argv) > 5:
+ import json
+ from types import ModuleType
+
+ class _MockPython(ModuleType):
+ @staticmethod
+ def get_current_context():
+ with open(sys.argv[5]) as file:
+ return json.load(file)
Review Comment:
When `BaseSerialization` serializes the `TaskInstance`,
it seems to be treating it as a string that looks like this
<details>
<summary>log</summary>
```log
8a0a679652d1
*** Found local files:
*** *
/root/airflow/logs/dag_id=context_sample/run_id=manual__2024-08-08T10:23:57.061809+00:00/task_id=print_context/attempt=1.log
[2024-08-08, 10:23:57 UTC] {local_task_job_runner.py:123} ▶ Pre task
execution logs
[2024-08-08, 10:23:57 UTC] {process_utils.py:183} INFO - Executing cmd:
/usr/local/bin/python -m virtualenv /tmp/venvu655ft58 --system-site-packages
--python=python
[2024-08-08, 10:23:57 UTC] {process_utils.py:187} INFO - Output:
[2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - created virtual
environment CPython3.8.19.final.0-64 in 257ms
[2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - creator
CPython3Posix(dest=/tmp/venvu655ft58, clear=False, no_vcs_ignore=False,
global=True)
[2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - seeder
FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle,
via=copy, app_data_dir=/root/.local/share/virtualenv)
[2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - added seed
packages: pip==24.1, setuptools==70.1.0, wheel==0.43.0
[2024-08-08, 10:23:58 UTC] {process_utils.py:191} INFO - activators
BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
[2024-08-08, 10:23:58 UTC] {process_utils.py:183} INFO - Executing cmd:
/tmp/venvu655ft58/bin/pip install -r /tmp/venvu655ft58/requirements.txt
[2024-08-08, 10:23:58 UTC] {process_utils.py:187} INFO - Output:
[2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO -
[2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO - [notice] A new
release of pip is available: 24.1 -> 24.2
[2024-08-08, 10:24:02 UTC] {process_utils.py:191} INFO - [notice] To update,
run: python -m pip install --upgrade pip
[2024-08-08, 10:24:02 UTC] {python.py:576} INFO - serializable_context:
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.TASK_CONTEXT: 'task_context'>,
<Encoding.VAR: '__var'>: {'conf': '<***.configuration.AirflowConfigParser '
'object at 0xe4886fce59d0>',
'conn': 'None',
'dag': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DAG: 'dag'>,
<Encoding.VAR: '__var'>: {'_dag_id':
'context_sample',
'_default_view': 'grid',
'_processor_dags_folder': '/files/dags',
'_task_group':
{'_group_id': None,
'children': {'print_context': (<DagAttributeTypes.OP: 'operator'>,
'print_context')},
'downstream_group_ids': [],
'downstream_task_ids': [],
'prefix_group_id': True,
'tooltip': '',
'ui_color': 'CornflowerBlue',
'ui_fgcolor': '#000',
'upstream_group_ids': [],
'upstream_task_ids': []},
'catchup':
False,
'dag_dependencies': [],
'edge_info': {},
'fileloc':
'/files/dags/context_sample.py',
'params': [],
'schedule_interval': None,
'start_date':
1640995200.0,
'tasks':
[{<Encoding.TYPE: '__type'>: <DagAttributeTypes.OP: 'operator'>,
<Encoding.VAR: '__var'>: {'_is_empty': False,
'_log_config_logger_name': '***.task.operators',
'_needs_expansion': False,
'_operator_name': '@task.virtualenv',
'_task_module': '***.decorators.python_virtualenv',
'_task_type': '_PythonVirtualenvDecoratedOperator',
'downstream_task_ids': [],
'is_setup': False,
'is_teardown': False,
'on_failure_fail_dagrun': False,
'op_args': (),
'op_kwargs': {},
'pool': 'default_pool',
'requirements': [],
'start_from_trigger': False,
'start_trigger_args': None,
'task_id': 'print_context',
'template_ext': ['.txt'],
'template_fields': ['index_urls',
'requirements',
'op_kwargs',
'templates_dict',
'op_args',
'venv_cache_path'],
'template_fields_renderers': {'op_args': 'py',
'op_kwargs': 'py',
'templates_dict': 'json'},
'ui_color': '#ffefeb',
'ui_fgcolor': '#000',
'weight_rule': 'downstream'}}],
'timezone':
'UTC'}},
'dag_run': '<DagRun context_sample @ 2024-08-08 '
'10:23:57.061809+00:00: '
'manual__2024-08-08T10:23:57.061809+00:00, '
'state:running, queued_at: 2024-08-08 '
'10:23:57.075396+00:00. externally '
'triggered: True>',
'data_interval_end': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'data_interval_start': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'ds': '2024-08-08',
'ds_nodash': '20240808',
'execution_date': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'expanded_ti_count': None,
'inlet_events': 'InletEventsAccessors(_inlets=[],
'
'_datasets={},
_dataset_aliases={}, '
'_session=<sqlalchemy.orm.session.Session '
'object at 0xe48866870a30>)',
'inlets': [],
'logical_date': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'macros': "<module '***.macros' from "
"'/opt/***/***/macros/__init__.py'>",
'map_index_template': None,
'next_ds': '2024-08-08',
'next_ds_nodash': '20240808',
'next_execution_date': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'outlet_events': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DATASET_EVENT_ACCESSORS: 'dataset_event_accessors'>,
<Encoding.VAR: '__var'>:
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>,
<Encoding.VAR: '__var'>: {}}},
'outlets': [],
'params': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DICT: 'dict'>,
<Encoding.VAR: '__var'>: {}},
'prev_data_interval_end_success':
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR:
'__var'>: 1723111433.152088},
'prev_data_interval_start_success':
{<Encoding.TYPE: '__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>: 1723111433.152088},
'prev_ds': '2024-08-08',
'prev_ds_nodash': '20240808',
'prev_end_date_success': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR:
'__var'>: 1723111440.429138},
'prev_execution_date': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR: '__var'>:
1723112637.061809},
'prev_execution_date_success': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR:
'__var'>: 1723111433.152088},
'prev_start_date_success': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DATETIME: 'datetime'>,
<Encoding.VAR:
'__var'>: 1723111433.284149},
'run_id':
'manual__2024-08-08T10:23:57.061809+00:00',
'task': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.OP: 'operator'>,
<Encoding.VAR: '__var'>: {'_is_empty':
False,
'_log_config_logger_name': '***.task.operators',
'_needs_expansion': False,
'_operator_name': '@task.virtualenv',
'_task_module': '***.decorators.python_virtualenv',
'_task_type':
'_PythonVirtualenvDecoratedOperator',
'downstream_task_ids': [],
'is_setup':
False,
'is_teardown':
False,
'on_failure_fail_dagrun': False,
'op_args': (),
'op_kwargs':
{},
'pool':
'default_pool',
'requirements': [],
'start_from_trigger': False,
'start_trigger_args': None,
'task_id':
'print_context',
'template_ext': ['.txt'],
'template_fields': ['index_urls',
'requirements',
'op_kwargs',
'templates_dict',
'op_args',
'venv_cache_path'],
'template_fields_renderers': {'op_args': 'py',
'op_kwargs': 'py',
'templates_dict': 'json'},
'ui_color':
'#ffefeb',
'ui_fgcolor':
'#000',
'weight_rule':
'downstream'}},
'task_instance': '<TaskInstance: '
'context_sample.print_context '
'manual__2024-08-08T10:23:57.061809+00:00 '
'[running]>',
'task_instance_key_str':
'context_sample__print_context__20240808',
'test_mode': False,
'ti': '<TaskInstance:
context_sample.print_context '
'manual__2024-08-08T10:23:57.061809+00:00 '
'[running]>',
'tomorrow_ds': '2024-08-09',
'tomorrow_ds_nodash': '20240809',
'triggering_dataset_events': {<Encoding.TYPE:
'__type'>: <DagAttributeTypes.DICT: 'dict'>,
<Encoding.VAR:
'__var'>: {}},
'ts': '2024-08-08T10:23:57.061809+00:00',
'ts_nodash': '20240808T102357',
'ts_nodash_with_tz':
'20240808T102357.061809+0000',
'var': {<Encoding.TYPE: '__type'>:
<DagAttributeTypes.DICT: 'dict'>,
<Encoding.VAR: '__var'>: {'json': 'None',
'value':
'None'}},
'yesterday_ds': '2024-08-07',
'yesterday_ds_nodash': '20240807'}}
[2024-08-08, 10:24:02 UTC] {process_utils.py:183} INFO - Executing cmd:
/tmp/venvu655ft58/bin/python /tmp/venv-callczu169m3/script.py
/tmp/venv-callczu169m3/script.in /tmp/venv-callczu169m3/script.out
/tmp/venv-callczu169m3/string_args.txt /tmp/venv-callczu169m3/termination.log
/tmp/venv-callczu169m3/***_context.b
[2024-08-08, 10:24:02 UTC] {process_utils.py:187} INFO - Output:
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO -
[2024-08-08T10:24:05.868+0000] {serialized_objects.py:804} INFO -
task_instance: <TaskInstance: context_sample.print_context
manual__2024-08-08T10:23:57.061809+00:00 [running]>, type: <class 'str'>
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - Traceback (most
recent call last):
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - File
"/tmp/venv-callczu169m3/script.py", line 73, in <module>
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - res =
print_context(*arg_dict["args"], **arg_dict["kwargs"])
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - File
"/tmp/venv-callczu169m3/script.py", line 23, in print_context
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - context =
get_current_context()
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - File
"/tmp/venv-callczu169m3/script.py", line 62, in get_current_context
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - return
BaseSerialization.deserialize(context, use_pydantic_models=True)
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - File
"/opt/***/***/serialization/serialized_objects.py", line 805, in deserialize
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - d["task"] =
d["task_instance"].task # todo: add `_encode` of Operator so we don't need this
[2024-08-08, 10:24:05 UTC] {process_utils.py:191} INFO - AttributeError:
'str' object has no attribute 'task'
[2024-08-08, 10:24:06 UTC] {taskinstance.py:3285} ERROR - Task failed with
exception
Traceback (most recent call last):
File "/opt/airflow/airflow/models/taskinstance.py", line 767, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 733, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/decorators/base.py", line 266, in execute
return_value = super().execute(context)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 504, in execute
return super().execute(context=serializable_context)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 239, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow/operators/python.py", line 878, in
execute_callable
result = self._execute_python_callable_in_subprocess(python_path)
File "/opt/airflow/airflow/operators/python.py", line 600, in
_execute_python_callable_in_subprocess
raise AirflowException(error_msg) from None
airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
'str' object has no attribute 'task'
[2024-08-08, 10:24:06 UTC] {taskinstance.py:1225} INFO - Marking task as
FAILED. dag_id=context_sample, task_id=print_context,
run_id=manual__2024-08-08T10:23:57.061809+00:00,
execution_date=20240808T102357, start_date=20240808T102357,
end_date=20240808T102406
[2024-08-08, 10:24:06 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2024-08-08, 10:24:06 UTC] {standard_task_runner.py:124} ERROR - Failed to
execute job 62 for task print_context (Process returned non-zero exit status 1.
'str' object has no attribute 'task'; 358)
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line
117, in _start_by_fork
ret = args.func(args, dag=self.dag)
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 483, in
task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 256, in
_run_task_by_selected_method
return _run_raw_task(args, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 341, in
_run_raw_task
return ti._run_raw_task(
File "/opt/airflow/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 2979, in
_run_raw_task
return _run_raw_task(
File "/opt/airflow/airflow/models/taskinstance.py", line 273, in
_run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 3133, in
_execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 3157, in
_execute_task
return _execute_task(self, context, task_orig)
File "/opt/airflow/airflow/models/taskinstance.py", line 767, in
_execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 733, in
_execute_callable
return ExecutionCallableRunner(
File "/opt/airflow/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/decorators/base.py", line 266, in execute
return_value = super().execute(context)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 504, in execute
return super().execute(context=serializable_context)
File "/opt/airflow/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
File "/opt/airflow/airflow/operators/python.py", line 239, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow/operators/python.py", line 878, in
execute_callable
result = self._execute_python_callable_in_subprocess(python_path)
File "/opt/airflow/airflow/operators/python.py", line 600, in
_execute_python_callable_in_subprocess
raise AirflowException(error_msg) from None
airflow.exceptions.AirflowException: Process returned non-zero exit status 1.
'str' object has no attribute 'task'
[2024-08-08, 10:24:06 UTC] {local_task_job_runner.py:261} INFO - Task exited
with return code 1
[2024-08-08, 10:24:06 UTC] {taskinstance.py:3875} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2024-08-08, 10:24:06 UTC] {local_task_job_runner.py:240} ▲▲▲ Log group end
```
</details>
This causes the deserialization to fail in the following code
https://github.com/apache/airflow/blob/ec0e9f28eafe7484887b21ded0c7a78bfc590ce0/airflow/serialization/serialized_objects.py#L804
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]