jscheffl commented on PR #38992:
URL: https://github.com/apache/airflow/pull/38992#issuecomment-2120775487

   Okay, did some testing with a breeze setup, manually "massaged" the ENV to 
enable AIP-44, strated internal API server, took away the DB connection and set 
the internal API endpoints for the worker. Using CeleryExecutor got the 
following results - based on the "mothership" PR - don't know if this helps in 
review.
   
   Based on example DAGs:
   
   1. Worker had problems parsing SubdagOperator, was not able to copy text 
from tmux, error stack is - failed in DB select of Pool object.. missing in 
internal API or will Subdag just not be supported? 
   
![image](https://github.com/apache/airflow/assets/95105677/7b5071b3-2cc4-466a-b14f-0b5c395914f2)
   2. DAG example_python_operator - worked / executed w/o error
   3. DAG example_python_decorator - worked / executed w/o error
   4. DAG example_branch_operator - failed in task branching - logs below - 
Seems there is still some DB access for branching
   5. DAG example_dynamic_task_mapping - failed in task sum_if - logs below - 
Seems to be serialized values are wrong type?
   6. DAG tutorial - failed in task templated - logs below - Seems to be an 
issue with templating
   7. DAG Params Trigger UI - failed in Select languages (as well as othrs) - 
logs below - Seems to be problems with serialized parameters
   
   Logs for 4
   ```
   (...)
   [2024-05-20, 18:03:32 CEST] {baseoperator.py:404} WARNING - 
BranchPythonOperator.execute cannot be called outside TaskInstance!
   [2024-05-20, 18:03:32 CEST] {python.py:240} INFO - Done. Returned value was: 
branch_a
   [2024-05-20, 18:03:32 CEST] {branch.py:36} INFO - Branch into branch_a
   [2024-05-20, 18:03:32 CEST] {skipmixin.py:178} INFO - Following branch 
branch_a
   [2024-05-20, 18:03:32 CEST] {taskinstance.py:742} ▼ Post task execution logs
   [2024-05-20, 18:03:32 CEST] {standard_task_runner.py:112} ERROR - Failed to 
execute job 108 for task branching (DagRunPydantic.get_task_instance() missing 
1 required positional argument: 'session'; 7490)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in 
_run_raw_task
       return ti._run_raw_task(
              ^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
147, in _run_raw_task
       return _run_raw_task(
              ^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 278, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
244, in _execute_task
       return _execute_task(task_instance=self, context=context, 
task_orig=task_orig)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 766, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 729, in 
_execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/operators/python.py", line 273, in execute
       return self.do_branch(context, super().execute(context))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/operators/branch.py", line 37, in do_branch
       self.skip_all_except(context["ti"], branches_to_execute)
     File "/opt/airflow/airflow/models/skipmixin.py", line 241, in 
skip_all_except
       if (downstream_ti := dag_run.get_task_instance(t.task_id, 
map_index=ti.map_index))
                            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: DagRunPydantic.get_task_instance() missing 1 required positional 
argument: 'session'
   [2024-05-20, 18:03:32 CEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   ```
   
   Logs for 5
   ```
   (...)
   [2024-05-20, 17:42:51 CEST] {taskinstance.py:742} ▼ Post task execution logs
   [2024-05-20, 17:42:51 CEST] {standard_task_runner.py:112} ERROR - Failed to 
execute job 82 for task sum_it (unsupported operand type(s) for +: 'int' and 
'str'; 4346)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in 
_run_raw_task
       return ti._run_raw_task(
              ^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
147, in _run_raw_task
       return _run_raw_task(
              ^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 278, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in 
_execute_task_with_callbacks
       result = self._execute_task(context, task_orig)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
244, in _execute_task
       return _execute_task(task_instance=self, context=context, 
task_orig=task_orig)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 766, in 
_execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 729, in 
_execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
       return_value = super().execute(context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/operators/python.py", line 238, in execute
       return_value = self.execute_callable()
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/operators/python.py", line 256, in 
execute_callable
       return runner.run(*self.op_args, **self.op_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/example_dags/example_dynamic_task_mapping.py", 
line 35, in sum_it
       total = sum(values)
               ^^^^^^^^^^^
   TypeError: unsupported operand type(s) for +: 'int' and 'str'
   [2024-05-20, 17:42:52 CEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   ```
   
   Logs for 6
   ```
   (...)
   [2024-05-20, 17:28:53 CEST] {standard_task_runner.py:93} INFO - Job 33: 
Subtask templated
   [2024-05-20, 17:28:53 CEST] {task_command.py:462} INFO - Running 
task_id='templated' dag_id='tutorial' 
run_id='manual__2024-05-20T15:23:39.841642+00:00' map_index=-1 
start_date=datetime.datetime(2024, 5, 20, 15, 28, 53, 331391, 
tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 
15, 23, 39, 841642, tzinfo=TzInfo(UTC)) duration=0.372874 state='running' 
try_number=2 max_tries=1 hostname='65a1e8ad69ac' unixname='root' job_id=33 
pool='default_pool' pool_slots=1 queue='default' priority_weight=1 
operator='BashOperator' custom_operator_name=None 
queued_dttm=datetime.datetime(2024, 5, 20, 15, 28, 50, 223477, 
tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None 
executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 
351237, tzinfo=TzInfo(UTC)) rendered_map_index=None 
external_executor_id='d5da9e9f-715a-48d6-a34c-0eeeb5f3d74f' trigger_id=None 
trigger_timeout=None next_method=None next_kwargs=None run_as_user=None 
task=<Task(BashOperato
 r): templated> test_mode=False dag_run=DagRunPydantic(id=9, dag_id='tutorial', 
queued_at=datetime.datetime(2024, 5, 20, 15, 23, 39, 884965, 
tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 
841642, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 23, 
40, 785817, tzinfo=TzInfo(UTC)), end_date=None, state='running', 
run_id='manual__2024-05-20T15:23:39.841642+00:00', creating_job_id=None, 
external_trigger=True, run_type='manual', conf={}, 
data_interval_start=datetime.datetime(2024, 5, 19, 15, 23, 39, 841642, 
tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 23, 
39, 841642, tzinfo=TzInfo(UTC)), 
last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 28, 53, 73827, 
tzinfo=TzInfo(UTC)), dag_hash='4e598e3c2b0dd862a24b86c57cc9387b', 
updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 79029, 
tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) 
dag_model=DagModelPydantic(dag_id='tutorial', ro
 ot_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, 
is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 28, 43, 
870484, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, 
scheduler_lock=None, pickle_id=None, 
fileloc='/opt/airflow/airflow/example_dags/tutorial.py', 
processor_subdir='/files/dags', owners='airflow', description='A simple 
tutorial DAG', default_view='grid', 
schedule_interval=datetime.timedelta(days=1), timetable_description='', 
tags=[DagTagPydantic(name='example', dag_id='tutorial')], dag_owner_links=[], 
parent_dag=None, max_active_tasks=16, max_active_runs=16, 
max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, 
has_import_errors=False) raw=True is_trigger_log_context=False on host 
65a1e8ad69ac
   [2024-05-20, 17:28:53 CEST] {warnings.py:110} WARNING - 
/usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: 
Pydantic serializer warnings:
     Expected `int` but got `str` - serialized value may not be as expected
     return self.__pydantic_serializer__.to_python(
   [2024-05-20, 17:28:53 CEST] {abstractoperator.py:741} ERROR - Exception 
rendering Jinja template for task 'templated', field 'bash_command'. Template: 
'\n{% for i in range(5) %}\n    echo "{{ ds }}"\n    echo "{{ macros.ds_add(ds, 
7)}}"\n{% endfor %}\n'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in 
_do_render_template_fields
       rendered_content = self.render_template(
                          ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/template/templater.py", line 169, in 
render_template
       return self._render(template, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in 
_render
       return super()._render(template, context, dag=dag)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/template/templater.py", line 126, in _render
       return render_template_to_string(template, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/helpers.py", line 289, in 
render_template_to_string
       return render_template(template, cast(MutableMapping[str, Any], 
context), native=False)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
       return "".join(nodes)
              ^^^^^^^^^^^^^^
     File "<template>", line 21, in root
     File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 
392, in call
       if not __self.is_safe_callable(__obj):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 
276, in is_safe_callable
       getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", 
False)
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 
864, in __getattr__
       return self._fail_with_undefined_error()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 
857, in _fail_with_undefined_error
       raise self._undefined_exception(self._undefined_message)
   jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
   [2024-05-20, 17:28:53 CEST] {standard_task_runner.py:112} ERROR - Failed to 
execute job 33 for task templated ('str object' has no attribute 'ds_add'; 2098)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in 
_run_raw_task
       return ti._run_raw_task(
              ^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
147, in _run_raw_task
       return _run_raw_task(
              ^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 278, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 2940, in 
_execute_task_with_callbacks
       task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
161, in render_templates
       return TaskInstance.render_templates(self=self, context=context, 
jinja_env=jinja_env)  # type: ignore[arg-type]
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 3354, in 
render_templates
       original_task.render_template_fields(context, jinja_env)
     File "/opt/airflow/airflow/models/baseoperator.py", line 1375, in 
render_template_fields
       self._do_render_template_fields(self, self.template_fields, context, 
jinja_env, set())
     File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in 
_do_render_template_fields
       rendered_content = self.render_template(
                          ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/template/templater.py", line 169, in 
render_template
       return self._render(template, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in 
_render
       return super()._render(template, context, dag=dag)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/template/templater.py", line 126, in _render
       return render_template_to_string(template, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/helpers.py", line 289, in 
render_template_to_string
       return render_template(template, cast(MutableMapping[str, Any], 
context), native=False)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
       return "".join(nodes)
              ^^^^^^^^^^^^^^
     File "<template>", line 21, in root
     File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 
392, in call
       if not __self.is_safe_callable(__obj):
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 
276, in is_safe_callable
       getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", 
False)
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 
864, in __getattr__
       return self._fail_with_undefined_error()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 
857, in _fail_with_undefined_error
       raise self._undefined_exception(self._undefined_message)
   jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
   [2024-05-20, 17:28:53 CEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   ```
   
   Logs for 7:
   ```
   (...)
   [2024-05-20, 17:21:43 CEST] {task_command.py:462} INFO - Running 
task_id='select_languages' dag_id='example_params_trigger_ui' 
run_id='manual__2024-05-20T17:21:36+02:00' map_index=-1 
start_date=datetime.datetime(2024, 5, 20, 15, 21, 43, 236734, 
tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 
15, 21, 36, tzinfo=TzInfo(UTC)) duration=None state='running' try_number=1 
max_tries=0 hostname='65a1e8ad69ac' unixname='root' job_id=24 
pool='default_pool' pool_slots=1 queue='default' priority_weight=5 
operator='_BranchPythonDecoratedOperator' custom_operator_name='@task.branch' 
queued_dttm=datetime.datetime(2024, 5, 20, 15, 21, 39, 246806, 
tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None 
executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 21, 43, 
255403, tzinfo=TzInfo(UTC)) rendered_map_index=None 
external_executor_id='d04a166e-5f47-44cc-884c-3178e7f0e7bd' trigger_id=None 
trigger_timeout=None next_method=None next_kwargs=None run_as
 _user=None task=<Task(_BranchPythonDecoratedOperator): select_languages> 
test_mode=False dag_run=DagRunPydantic(id=8, 
dag_id='example_params_trigger_ui', queued_at=datetime.datetime(2024, 5, 20, 
15, 21, 38, 752539, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 
5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 
20, 15, 21, 39, 180598, tzinfo=TzInfo(UTC)), end_date=None, state='running', 
run_id='manual__2024-05-20T17:21:36+02:00', creating_job_id=None, 
external_trigger=True, run_type='manual', conf={'names': ['Linda', 'Martha', 
'Thomas'], 'english': True, 'german': True, 'french': True}, 
data_interval_start=datetime.datetime(2024, 5, 20, 15, 21, 36, 
tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 21, 
36, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 
20, 15, 21, 42, 720624, tzinfo=TzInfo(UTC)), 
dag_hash='ac1f36ca0393ba5bbc744485395bc36c', updated_at=datetime.datetime(2024, 
5, 20, 15, 21, 42, 7
 37459, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], 
log_template_id=2) 
dag_model=DagModelPydantic(dag_id='example_params_trigger_ui', 
root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, 
is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 21, 27, 
370307, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, 
scheduler_lock=None, pickle_id=None, 
fileloc='/opt/airflow/airflow/example_dags/example_params_trigger_ui.py', 
processor_subdir='/files/dags', owners='airflow', description='Example DAG 
demonstrating the usage DAG params to model a trigger UI with a user form', 
default_view='grid', schedule_interval=None, timetable_description='Never, 
external triggers only', tags=[DagTagPydantic(name='params', 
dag_id='example_params_trigger_ui'), DagTagPydantic(name='example', 
dag_id='example_params_trigger_ui')], dag_owner_links=[], parent_dag=None, 
max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, 
has_task_
 concurrency_limits=False, has_import_errors=False) raw=True 
is_trigger_log_context=False on host 65a1e8ad69ac
   [2024-05-20, 17:21:43 CEST] {warnings.py:110} WARNING - 
/usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: 
Pydantic serializer warnings:
     Expected `int` but got `str` - serialized value may not be as expected
     return self.__pydantic_serializer__.to_python(
   [2024-05-20, 17:21:43 CEST] {standard_task_runner.py:112} ERROR - Failed to 
execute job 24 for task select_languages (Error calling function 
`serialize_operator`: ValueError: Params to a DAG or a Task can be only of type 
airflow.models.param.Param, but param 'names' is <class 'list'>; 1015)
   Traceback (most recent call last):
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
62, in serialize_operator
       return BaseSerialization.serialize(x, use_pydantic_models=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, 
in serialize
       return cls._encode(SerializedBaseOperator.serialize_operator(var), 
type_=DAT.OP)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1015, in serialize_operator
       return cls._serialize_node(op, include_deps=op.deps is not 
BaseOperator.deps)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1066, in _serialize_node
       serialize_op["params"] = cls._serialize_params_dict(op.params)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, 
in _serialize_params_dict
       raise ValueError(
   ValueError: Params to a DAG or a Task can be only of type 
airflow.models.param.Param, but param 'names' is <class 'list'>
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 278, in 
_run_raw_task
       TaskInstance._execute_task_with_callbacks(
     File "/opt/airflow/airflow/models/taskinstance.py", line 2948, in 
_execute_task_with_callbacks
       _update_rtif(ti=self, rendered_fields=rendered_fields)
     File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, 
in wrapper
       args_dict = BaseSerialization.serialize(arguments_dict, 
use_pydantic_models=True)
                   
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, 
in serialize
       str(k): cls.serialize(v, strict=strict, 
use_pydantic_models=use_pydantic_models)
               
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, 
in serialize
       mod = _pydantic_model_dump(pyd_mod, var)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, 
in _pydantic_model_dump
       return model_cls.model_validate(var).model_dump(mode="json")  # type: 
ignore[attr-defined]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, 
in model_dump
       return self.__pydantic_serializer__.to_python(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   pydantic_core._pydantic_core.PydanticSerializationError: Error calling 
function `serialize_operator`: ValueError: Params to a DAG or a Task can be 
only of type airflow.models.param.Param, but param 'names' is <class 'list'>
   During handling of the above exception, another exception occurred:
   Traceback (most recent call last):
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
62, in serialize_operator
       return BaseSerialization.serialize(x, use_pydantic_models=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, 
in serialize
       return cls._encode(SerializedBaseOperator.serialize_operator(var), 
type_=DAT.OP)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1015, in serialize_operator
       return cls._serialize_node(op, include_deps=op.deps is not 
BaseOperator.deps)
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1066, in _serialize_node
       serialize_op["params"] = cls._serialize_params_dict(op.params)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, 
in _serialize_params_dict
       raise ValueError(
   ValueError: Params to a DAG or a Task can be only of type 
airflow.models.param.Param, but param 'names' is <class 'list'>
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
     File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 
105, in _start_by_fork
       ret = args.func(args, dag=self.dag)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in 
task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in 
_run_task_by_selected_method
       return _run_raw_task(args, ti)
              ^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in 
_run_raw_task
       return ti._run_raw_task(
              ^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
147, in _run_raw_task
       return _run_raw_task(
              ^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/models/taskinstance.py", line 341, in 
_run_raw_task
       ti.handle_failure(e, test_mode, context, session=session)
     File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 
333, in handle_failure
       _handle_failure(
     File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, 
in wrapper
       args_dict = BaseSerialization.serialize(arguments_dict, 
use_pydantic_models=True)
                   
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, 
in serialize
       str(k): cls.serialize(v, strict=strict, 
use_pydantic_models=use_pydantic_models)
               
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, 
in serialize
       mod = _pydantic_model_dump(pyd_mod, var)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, 
in _pydantic_model_dump
       return model_cls.model_validate(var).model_dump(mode="json")  # type: 
ignore[attr-defined]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, 
in model_dump
       return self.__pydantic_serializer__.to_python(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   pydantic_core._pydantic_core.PydanticSerializationError: Error calling 
function `serialize_operator`: ValueError: Params to a DAG or a Task can be 
only of type airflow.models.param.Param, but param 'names' is <class 'list'>
   [2024-05-20, 17:21:43 CEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to