phi-friday opened a new issue, #41578:
URL: https://github.com/apache/airflow/issues/41578
### Apache Airflow version
2.10.0rc1
### If "Other Airflow 2 version" selected, which one?
2.10.0(not rc)
### What happened?
Cannot execute Dynamic Task Mapping when `default_arg` is present.
```shell
dddbfe3b933a
*** Found local files:
*** *
/opt/airflow/logs/dag_id=dynamic_task_error/run_id=manual__2024-08-19T08:19:45.119366+00:00/task_id=add/map_index=0/attempt=1.log
[2024-08-19, 17:19:47 KST] {local_task_job_runner.py:123} ▼ Pre task
execution logs
[2024-08-19, 17:19:47 KST] {taskinstance.py:2603} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
dynamic_task_error.add manual__2024-08-19T08:19:45.119366+00:00 map_index=0
[queued]>
[2024-08-19, 17:19:47 KST] {taskinstance.py:2603} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: dynamic_task_error.add
manual__2024-08-19T08:19:45.119366+00:00 map_index=0 [queued]>
[2024-08-19, 17:19:47 KST] {taskinstance.py:2856} INFO - Starting attempt 1
of 1
[2024-08-19, 17:19:47 KST] {taskinstance.py:2879} INFO - Executing
<Mapped(_PythonDecoratedOperator): add> on 2024-08-19 08:19:45.119366+00:00
[2024-08-19, 17:19:47 KST] {warnings.py:112} WARNING -
/home/***/.local/lib/python3.12/site-packages/***/task/task_runner/standard_task_runner.py:70:
DeprecationWarning: This process (pid=18581) is multi-threaded, use of fork()
may lead to deadlocks in the child.
pid = os.fork()
[2024-08-19, 17:19:47 KST] {standard_task_runner.py:72} INFO - Started
process 18604 to run task
[2024-08-19, 17:19:47 KST] {standard_task_runner.py:104} INFO - Running:
['***', 'tasks', 'run', 'dynamic_task_error', 'add',
'manual__2024-08-19T08:19:45.119366+00:00', '--job-id', '4088', '--raw',
'--subdir', 'DAGS_FOLDER/dynamic_test.py', '--cfg-path', '/tmp/tmpz8eds5gr',
'--map-index', '0']
[2024-08-19, 17:19:47 KST] {standard_task_runner.py:105} INFO - Job 4088:
Subtask add
[2024-08-19, 17:19:47 KST] {task_command.py:467} INFO - Running
<TaskInstance: dynamic_task_error.add manual__2024-08-19T08:19:45.119366+00:00
map_index=0 [running]> on host dddbfe3b933a
[2024-08-19, 17:19:47 KST] {taskinstance.py:3301} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 273, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 3105, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 3524, in render_templates
original_task.render_template_fields(context, jinja_env)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/mappedoperator.py",
line 913, in render_template_fields
unmapped_task = self.unmap(mapped_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/mappedoperator.py",
line 824, in unmap
op = self.operator_class(**kwargs, _airflow_from_mapped=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 490, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/decorators/python.py",
line 52, in __init__
super().__init__(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 490, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/decorators/base.py",
line 258, in __init__
super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 490, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/operators/python.py",
line 222, in __init__
super().__init__(**kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 490, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 938, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to
_PythonDecoratedOperator (task_id: add__1). Invalid arguments were:
**kwargs: {'key': 'value', 'arbitrary': 'value'}
[2024-08-19, 17:19:47 KST] {taskinstance.py:3349} ERROR - Unable to unmap
task to determine if we need to send an alert email
[2024-08-19, 17:19:47 KST] {taskinstance.py:1225} INFO - Marking task as
FAILED. dag_id=dynamic_task_error, task_id=add,
run_id=manual__2024-08-19T08:19:45.119366+00:00, map_index=0,
execution_date=20240819T081945, start_date=20240819T081947,
end_date=20240819T081947
[2024-08-19, 17:19:47 KST] {taskinstance.py:340} ▶ Post task execution logs
```
### What you think should happen instead?
Unused arguments should be ignored (i.e., they should behave the same as any
other task).
As you can see in the example below, `Task <success>` is executed, but `Task
<added_values>` is not.
### How to reproduce
```python
from __future__ import annotations
from airflow.decorators import dag, task
from pendulum import datetime
@dag(
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
default_args={"key": "value", "arbitrary": "value", "do_xcom_push":
False},
)
def dynamic_task_error() -> None:
@task.python()
def success() -> str:
return "success"
@task.python(do_xcom_push=True)
def add(x: int, y: int) -> int:
return x + y
do_success = success()
added_values = add.partial(y=10).expand(x=[1, 2, 3])
_ = do_success >> added_values
dynamic_task_error()
```
### Operating System
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux"
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian
HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
### Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.7.3
apache-airflow-providers-common-compat==1.1.0
apache-airflow-providers-common-io==1.4.0
apache-airflow-providers-common-sql==1.15.0
apache-airflow-providers-docker==3.12.3
apache-airflow-providers-fab==1.2.2
apache-airflow-providers-ftp==3.10.1
apache-airflow-providers-http==4.12.0
apache-airflow-providers-imap==3.6.1
apache-airflow-providers-jdbc==4.4.0
apache-airflow-providers-odbc==4.6.3
apache-airflow-providers-postgres==5.11.3
apache-airflow-providers-redis==3.7.1
apache-airflow-providers-smtp==1.7.1
apache-airflow-providers-sqlite==3.8.2
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]