aru-trackunit opened a new issue, #35058:
URL: https://github.com/apache/airflow/issues/35058
### Apache Airflow version
2.7.2
### What happened
My intention was to create a task using task flow, but I was unable to get
it working so I wrote an equivalent example with a traditional approach. Have I
done any mistake here? It should be easy to reproduce the error, by copying DAG
below:
```
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
from datetime import datetime
DAG_ID = "test-task-decorator"
default_args = {
"owner": "team_A",
"depends_on_past": False,
"start_date": datetime(2023, 10, 19),
"urgency": "low",
}
METRICS_PATH =
"dags/data_quality/json_definition/cumulative_operating_hours_metrics.json"
with DAG(
dag_id=DAG_ID,
description="UTC Timezone",
default_args=default_args,
schedule_interval="0 3 * * *",
catchup=True,
max_active_runs=1,
tags=["data_quality"],
) as dag:
def generate_params():
return [{
'calculation_params': {'foo': 'bar'},
'alarm_threshold': {'foo': 'bar'}
}]
generating_parameters = PythonOperator(
task_id="generate_params",
python_callable=generate_params,
)
@task(
retries=0,
executor_config={
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
labels={
"owner": default_args["owner"],
"tier": default_args["urgency"],
}
)
)
}
)
def task_approach(calculation_params: dict, alarm_threshold: dict):
assert type(isinstance(calculation_params, dict))
assert type(isinstance(alarm_threshold, dict))
print(calculation_params)
print(alarm_threshold)
def example_method_traditional_approach(calculation_params: dict,
alarm_threshold: dict):
assert type(isinstance(calculation_params, dict))
assert type(isinstance(alarm_threshold, dict))
print(calculation_params)
print(alarm_threshold)
trigger_alarms =
task_approach.expand_kwargs(generating_parameters.output)
test_assert_metrics = PythonOperator.partial(
task_id="traditional_approach",
executor_config={
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(
labels={
"owner": default_args["owner"],
"tier": default_args["urgency"],
}
)
)
},
python_callable=example_method_traditional_approach
).expand(op_kwargs=generating_parameters.output)
```
Task approach generates the following error
```
28adcc5bba6d
*** Found local files:
*** *
/opt/airflow/logs/dag_id=test-task-decorator/run_id=manual__2023-10-19T13:33:41.419714+00:00/task_id=task_approach/map_index=0/attempt=1.log
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
test-task-decorator.task_approach manual__2023-10-19T13:33:41.419714+00:00
map_index=0 [queued]>
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance:
test-task-decorator.task_approach manual__2023-10-19T13:33:41.419714+00:00
map_index=0 [queued]>
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1359} INFO - Starting attempt 1
of 1
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1380} INFO - Executing
<Mapped(_PythonDecoratedOperator): task_approach> on 2023-10-19
13:33:41.419714+00:00
[2023-10-19, 15:33:44 CEST] {standard_task_runner.py:57} INFO - Started
process 11242 to run task
[2023-10-19, 15:33:44 CEST] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'test-task-decorator', 'task_approach',
'manual__2023-10-19T13:33:41.419714+00:00', '--job-id', '222', '--raw',
'--subdir', 'DAGS_FOLDER/data_quality/insights/test.py', '--cfg-path',
'/tmp/tmpgoaqdfif', '--map-index', '0']
[2023-10-19, 15:33:44 CEST] {standard_task_runner.py:85} INFO - Job 222:
Subtask task_approach
[2023-10-19, 15:33:44 CEST] {task_command.py:415} INFO - Running
<TaskInstance: test-task-decorator.task_approach
manual__2023-10-19T13:33:41.419714+00:00 map_index=0 [running]> on host
28adcc5bba6d
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1935} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 1516, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 1645, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 2283, in render_templates
original_task.render_template_fields(context)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py",
line 725, in render_template_fields
unmapped_task = self.unmap(mapped_kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py",
line 643, in unmap
op = self.operator_class(**kwargs, _airflow_from_mapped=True)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/python.py",
line 49, in __init__
super().__init__(
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/base.py",
line 213, in __init__
super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
line 177, in __init__
super().__init__(**kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 794, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to
_PythonDecoratedOperator (task_id: task_approach__1). Invalid arguments were:
**kwargs: {'urgency': 'low'}
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1981} ERROR - Unable to unmap
task to determine if we need to send an alert email
[2023-10-19, 15:33:44 CEST] {taskinstance.py:1398} INFO - Marking task as
FAILED. dag_id=test-task-decorator, task_id=task_approach, map_index=0,
execution_date=20231019T133341, start_date=20231019T133344,
end_date=20231019T133344
[2023-10-19, 15:33:44 CEST] {standard_task_runner.py:104} ERROR - Failed to
execute job 222 for task task_approach (Invalid arguments were passed to
_PythonDecoratedOperator (task_id: task_approach__1). Invalid arguments were:
**kwargs: {'urgency': 'low'}; 11242)
[2023-10-19, 15:33:44 CEST] {local_task_job_runner.py:228} INFO - Task
exited with return code 1
[2023-10-19, 15:33:44 CEST] {taskinstance.py:2776} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
### What you think should happen instead
_No response_
### How to reproduce
Please copy the dag attached above and execute in airflow.
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon | 8.7.1 | Amazon integration (including
Amazon Web Services (AWS)).
-- | -- | --
apache-airflow-providers-cncf-kubernetes | 7.6.0 | Kubernetes
apache-airflow-providers-common-sql | 1.7.2 | Common SQL Provider
apache-airflow-providers-databricks | 4.5.0 | Databricks
apache-airflow-providers-ftp | 3.5.1 | File Transfer Protocol (FTP)
apache-airflow-providers-google | 10.9.0 | Google services including: -
Google Ads - Google Cloud (GCP) - Google Firebase - Google LevelDB -
Google Marketing Platform - Google Workspace (formerly Google Suite)
apache-airflow-providers-hashicorp | 3.4.3 | Hashicorp including Hashicorp
Vault
apache-airflow-providers-http | 4.5.1 | Hypertext Transfer Protocol (HTTP)
apache-airflow-providers-imap | 3.3.1 | Internet Message Access Protocol
(IMAP)
apache-airflow-providers-mysql | 5.3.1 | MySQL
apache-airflow-providers-postgres | 5.6.1 | PostgreSQL
apache-airflow-providers-sftp | 4.6.1 | SSH File Transfer Protocol (SFTP)
apache-airflow-providers-sqlite | 3.4.3 | SQLite
apache-airflow-providers-ssh | 3.7.3 | Secure Shell (SSH)
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]