jscheffl commented on PR #38992: URL: https://github.com/apache/airflow/pull/38992#issuecomment-2120775487
Okay, did some testing with a breeze setup, manually "massaged" the ENV to enable AIP-44, strated internal API server, took away the DB connection and set the internal API endpoints for the worker. Using CeleryExecutor got the following results - based on the "mothership" PR - don't know if this helps in review. Based on example DAGs: 1. Worker had problems parsing SubdagOperator, was not able to copy text from tmux, error stack is - failed in DB select of Pool object.. missing in internal API or will Subdag just not be supported?  2. DAG example_python_operator - worked / executed w/o error 3. DAG example_python_decorator - worked / executed w/o error 4. DAG example_branch_operator - failed in task branching - logs below - Seems there is still some DB access for branching 5. DAG example_dynamic_task_mapping - failed in task sum_if - logs below - Seems to be serialized values are wrong type? 6. DAG tutorial - failed in task templated - logs below - Seems to be an issue with templating 7. DAG Params Trigger UI - failed in Select languages (as well as othrs) - logs below - Seems to be problems with serialized parameters Logs for 4 ``` (...) [2024-05-20, 18:03:32 CEST] {baseoperator.py:404} WARNING - BranchPythonOperator.execute cannot be called outside TaskInstance! [2024-05-20, 18:03:32 CEST] {python.py:240} INFO - Done. Returned value was: branch_a [2024-05-20, 18:03:32 CEST] {branch.py:36} INFO - Branch into branch_a [2024-05-20, 18:03:32 CEST] {skipmixin.py:178} INFO - Following branch branch_a [2024-05-20, 18:03:32 CEST] {taskinstance.py:742} ▼ Post task execution logs [2024-05-20, 18:03:32 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 108 for task branching (DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session'; 7490) Traceback (most recent call last): File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork ret = args.func(args, dag=self.dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method return _run_raw_task(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task return ti._run_raw_task( ^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task return _run_raw_task( ^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task return _execute_task(task_instance=self, context=context, task_orig=task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/operators/python.py", line 273, in execute return self.do_branch(context, super().execute(context)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/operators/branch.py", line 37, in do_branch self.skip_all_except(context["ti"], branches_to_execute) File "/opt/airflow/airflow/models/skipmixin.py", line 241, in skip_all_except if (downstream_ti := dag_run.get_task_instance(t.task_id, map_index=ti.map_index)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session' [2024-05-20, 18:03:32 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1 ``` Logs for 5 ``` (...) [2024-05-20, 17:42:51 CEST] {taskinstance.py:742} ▼ Post task execution logs [2024-05-20, 17:42:51 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 82 for task sum_it (unsupported operand type(s) for +: 'int' and 'str'; 4346) Traceback (most recent call last): File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork ret = args.func(args, dag=self.dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method return _run_raw_task(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task return ti._run_raw_task( ^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task return _run_raw_task( ^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks result = self._execute_task(context, task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task return _execute_task(task_instance=self, context=context, task_orig=task_orig) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/decorators/base.py", line 265, in execute return_value = super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/operators/python.py", line 238, in execute return_value = self.execute_callable() ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable return runner.run(*self.op_args, **self.op_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/example_dags/example_dynamic_task_mapping.py", line 35, in sum_it total = sum(values) ^^^^^^^^^^^ TypeError: unsupported operand type(s) for +: 'int' and 'str' [2024-05-20, 17:42:52 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1 ``` Logs for 6 ``` (...) [2024-05-20, 17:28:53 CEST] {standard_task_runner.py:93} INFO - Job 33: Subtask templated [2024-05-20, 17:28:53 CEST] {task_command.py:462} INFO - Running task_id='templated' dag_id='tutorial' run_id='manual__2024-05-20T15:23:39.841642+00:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 28, 53, 331391, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)) duration=0.372874 state='running' try_number=2 max_tries=1 hostname='65a1e8ad69ac' unixname='root' job_id=33 pool='default_pool' pool_slots=1 queue='default' priority_weight=1 operator='BashOperator' custom_operator_name=None queued_dttm=datetime.datetime(2024, 5, 20, 15, 28, 50, 223477, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 351237, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d5da9e9f-715a-48d6-a34c-0eeeb5f3d74f' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as_user=None task=<Task(BashOperato r): templated> test_mode=False dag_run=DagRunPydantic(id=9, dag_id='tutorial', queued_at=datetime.datetime(2024, 5, 20, 15, 23, 39, 884965, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 23, 40, 785817, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T15:23:39.841642+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 5, 19, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 28, 53, 73827, tzinfo=TzInfo(UTC)), dag_hash='4e598e3c2b0dd862a24b86c57cc9387b', updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 79029, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='tutorial', ro ot_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 28, 43, 870484, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/tutorial.py', processor_subdir='/files/dags', owners='airflow', description='A simple tutorial DAG', default_view='grid', schedule_interval=datetime.timedelta(days=1), timetable_description='', tags=[DagTagPydantic(name='example', dag_id='tutorial')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac [2024-05-20, 17:28:53 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings: Expected `int` but got `str` - serialized value may not be as expected return self.__pydantic_serializer__.to_python( [2024-05-20, 17:28:53 CEST] {abstractoperator.py:741} ERROR - Exception rendering Jinja template for task 'templated', field 'bash_command'. Template: '\n{% for i in range(5) %}\n echo "{{ ds }}"\n echo "{{ macros.ds_add(ds, 7)}}"\n{% endfor %}\n' Traceback (most recent call last): File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields rendered_content = self.render_template( ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/template/templater.py", line 169, in render_template return self._render(template, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render return super()._render(template, context, dag=dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/template/templater.py", line 126, in _render return render_template_to_string(template, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string return render_template(template, cast(MutableMapping[str, Any], context), native=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template return "".join(nodes) ^^^^^^^^^^^^^^ File "<template>", line 21, in root File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call if not __self.is_safe_callable(__obj): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__ return self._fail_with_undefined_error() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error raise self._undefined_exception(self._undefined_message) jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add' [2024-05-20, 17:28:53 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 33 for task templated ('str object' has no attribute 'ds_add'; 2098) Traceback (most recent call last): File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork ret = args.func(args, dag=self.dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method return _run_raw_task(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task return ti._run_raw_task( ^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task return _run_raw_task( ^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/opt/airflow/airflow/models/taskinstance.py", line 2940, in _execute_task_with_callbacks task_orig = self.render_templates(context=context, jinja_env=jinja_env) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 161, in render_templates return TaskInstance.render_templates(self=self, context=context, jinja_env=jinja_env) # type: ignore[arg-type] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 3354, in render_templates original_task.render_template_fields(context, jinja_env) File "/opt/airflow/airflow/models/baseoperator.py", line 1375, in render_template_fields self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields rendered_content = self.render_template( ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/template/templater.py", line 169, in render_template return self._render(template, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render return super()._render(template, context, dag=dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/template/templater.py", line 126, in _render return render_template_to_string(template, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string return render_template(template, cast(MutableMapping[str, Any], context), native=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template return "".join(nodes) ^^^^^^^^^^^^^^ File "<template>", line 21, in root File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call if not __self.is_safe_callable(__obj): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__ return self._fail_with_undefined_error() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error raise self._undefined_exception(self._undefined_message) jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add' [2024-05-20, 17:28:53 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1 ``` Logs for 7: ``` (...) [2024-05-20, 17:21:43 CEST] {task_command.py:462} INFO - Running task_id='select_languages' dag_id='example_params_trigger_ui' run_id='manual__2024-05-20T17:21:36+02:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 21, 43, 236734, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)) duration=None state='running' try_number=1 max_tries=0 hostname='65a1e8ad69ac' unixname='root' job_id=24 pool='default_pool' pool_slots=1 queue='default' priority_weight=5 operator='_BranchPythonDecoratedOperator' custom_operator_name='@task.branch' queued_dttm=datetime.datetime(2024, 5, 20, 15, 21, 39, 246806, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 21, 43, 255403, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d04a166e-5f47-44cc-884c-3178e7f0e7bd' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as _user=None task=<Task(_BranchPythonDecoratedOperator): select_languages> test_mode=False dag_run=DagRunPydantic(id=8, dag_id='example_params_trigger_ui', queued_at=datetime.datetime(2024, 5, 20, 15, 21, 38, 752539, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 21, 39, 180598, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T17:21:36+02:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={'names': ['Linda', 'Martha', 'Thomas'], 'english': True, 'german': True, 'french': True}, data_interval_start=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 21, 42, 720624, tzinfo=TzInfo(UTC)), dag_hash='ac1f36ca0393ba5bbc744485395bc36c', updated_at=datetime.datetime(2024, 5, 20, 15, 21, 42, 7 37459, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='example_params_trigger_ui', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 21, 27, 370307, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/example_params_trigger_ui.py', processor_subdir='/files/dags', owners='airflow', description='Example DAG demonstrating the usage DAG params to model a trigger UI with a user form', default_view='grid', schedule_interval=None, timetable_description='Never, external triggers only', tags=[DagTagPydantic(name='params', dag_id='example_params_trigger_ui'), DagTagPydantic(name='example', dag_id='example_params_trigger_ui')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_ concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac [2024-05-20, 17:21:43 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings: Expected `int` but got `str` - serialized value may not be as expected return self.__pydantic_serializer__.to_python( [2024-05-20, 17:21:43 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 24 for task select_languages (Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>; 1015) Traceback (most recent call last): File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator return BaseSerialization.serialize(x, use_pydantic_models=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node serialize_op["params"] = cls._serialize_params_dict(op.params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict raise ValueError( ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'> The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task TaskInstance._execute_task_with_callbacks( File "/opt/airflow/airflow/models/taskinstance.py", line 2948, in _execute_task_with_callbacks _update_rtif(ti=self, rendered_fields=rendered_fields) File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize mod = _pydantic_model_dump(pyd_mod, var) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump return self.__pydantic_serializer__.to_python( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator return BaseSerialization.serialize(x, use_pydantic_models=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node serialize_op["params"] = cls._serialize_params_dict(op.params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict raise ValueError( ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'> The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork ret = args.func(args, dag=self.dag) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run task_return_code = _run_task_by_selected_method(args, _dag, ti) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method return _run_raw_task(args, ti) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task return ti._run_raw_task( ^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task return _run_raw_task( ^^^^^^^^^^^^^^ File "/opt/airflow/airflow/models/taskinstance.py", line 341, in _run_raw_task ti.handle_failure(e, test_mode, context, session=session) File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 333, in handle_failure _handle_failure( File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize mod = _pydantic_model_dump(pyd_mod, var) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump return self.__pydantic_serializer__.to_python( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'> [2024-05-20, 17:21:43 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
