aru-trackunit opened a new issue, #35058:
URL: https://github.com/apache/airflow/issues/35058

   ### Apache Airflow version
   
   2.7.2
   
   ### What happened
   
   My intention was to create a task using task flow, but I was unable to get 
it working so I wrote an equivalent example with a traditional approach. Have I 
done any mistake here? It should be easy to reproduce the error, by copying DAG 
below:
   
   ```
   from airflow import DAG
   from airflow.decorators import task
   from airflow.operators.python import PythonOperator
   from kubernetes.client import models as k8s
   from datetime import datetime
   
   DAG_ID = "test-task-decorator"
   default_args = {
       "owner": "team_A",
       "depends_on_past": False,
       "start_date": datetime(2023, 10, 19),
       "urgency": "low",
   }
   
   METRICS_PATH = 
"dags/data_quality/json_definition/cumulative_operating_hours_metrics.json"
   
   with DAG(
           dag_id=DAG_ID,
           description="UTC Timezone",
           default_args=default_args,
           schedule_interval="0 3 * * *",
           catchup=True,
           max_active_runs=1,
           tags=["data_quality"],
   ) as dag:
   
       def generate_params():
           return [{
               'calculation_params': {'foo': 'bar'},
               'alarm_threshold': {'foo': 'bar'}
           }]
   
       generating_parameters = PythonOperator(
           task_id="generate_params",
           python_callable=generate_params,
       )
   
       @task(
           retries=0,
           executor_config={
               "pod_override": k8s.V1Pod(
                   metadata=k8s.V1ObjectMeta(
                       labels={
                           "owner": default_args["owner"],
                           "tier": default_args["urgency"],
                       }
                   )
               )
           }
       )
       def task_approach(calculation_params: dict, alarm_threshold: dict):
           assert type(isinstance(calculation_params, dict))
           assert type(isinstance(alarm_threshold, dict))
           print(calculation_params)
           print(alarm_threshold)
   
   
       def example_method_traditional_approach(calculation_params: dict, 
alarm_threshold: dict):
           assert type(isinstance(calculation_params, dict))
           assert type(isinstance(alarm_threshold, dict))
           print(calculation_params)
           print(alarm_threshold)
   
   
       trigger_alarms = 
task_approach.expand_kwargs(generating_parameters.output)
   
       test_assert_metrics = PythonOperator.partial(
           task_id="traditional_approach",
           executor_config={
               "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(
                   labels={
                       "owner": default_args["owner"],
                       "tier": default_args["urgency"],
                   }
               )
               )
           },
           python_callable=example_method_traditional_approach
       ).expand(op_kwargs=generating_parameters.output)
   ```
   Task approach generates the following error
   
   ```
   28adcc5bba6d
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=test-task-decorator/run_id=manual__2023-10-19T13:33:41.419714+00:00/task_id=task_approach/map_index=0/attempt=1.log
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
test-task-decorator.task_approach manual__2023-10-19T13:33:41.419714+00:00 
map_index=0 [queued]>
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1157} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
test-task-decorator.task_approach manual__2023-10-19T13:33:41.419714+00:00 
map_index=0 [queued]>
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1359} INFO - Starting attempt 1 
of 1
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1380} INFO - Executing 
<Mapped(_PythonDecoratedOperator): task_approach> on 2023-10-19 
13:33:41.419714+00:00
   [2023-10-19, 15:33:44 CEST] {standard_task_runner.py:57} INFO - Started 
process 11242 to run task
   [2023-10-19, 15:33:44 CEST] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'test-task-decorator', 'task_approach', 
'manual__2023-10-19T13:33:41.419714+00:00', '--job-id', '222', '--raw', 
'--subdir', 'DAGS_FOLDER/data_quality/insights/test.py', '--cfg-path', 
'/tmp/tmpgoaqdfif', '--map-index', '0']
   [2023-10-19, 15:33:44 CEST] {standard_task_runner.py:85} INFO - Job 222: 
Subtask task_approach
   [2023-10-19, 15:33:44 CEST] {task_command.py:415} INFO - Running 
<TaskInstance: test-task-decorator.task_approach 
manual__2023-10-19T13:33:41.419714+00:00 map_index=0 [running]> on host 
28adcc5bba6d
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1935} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 1516, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode, session=session)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 1645, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 2283, in render_templates
       original_task.render_template_fields(context)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py",
 line 725, in render_template_fields
       unmapped_task = self.unmap(mapped_kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py",
 line 643, in unmap
       op = self.operator_class(**kwargs, _airflow_from_mapped=True)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 437, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/python.py",
 line 49, in __init__
       super().__init__(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 437, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/base.py", 
line 213, in __init__
       super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 437, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
 line 177, in __init__
       super().__init__(**kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 437, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 794, in __init__
       raise AirflowException(
   airflow.exceptions.AirflowException: Invalid arguments were passed to 
_PythonDecoratedOperator (task_id: task_approach__1). Invalid arguments were:
   **kwargs: {'urgency': 'low'}
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1981} ERROR - Unable to unmap 
task to determine if we need to send an alert email
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:1398} INFO - Marking task as 
FAILED. dag_id=test-task-decorator, task_id=task_approach, map_index=0, 
execution_date=20231019T133341, start_date=20231019T133344, 
end_date=20231019T133344
   [2023-10-19, 15:33:44 CEST] {standard_task_runner.py:104} ERROR - Failed to 
execute job 222 for task task_approach (Invalid arguments were passed to 
_PythonDecoratedOperator (task_id: task_approach__1). Invalid arguments were:
   **kwargs: {'urgency': 'low'}; 11242)
   [2023-10-19, 15:33:44 CEST] {local_task_job_runner.py:228} INFO - Task 
exited with return code 1
   [2023-10-19, 15:33:44 CEST] {taskinstance.py:2776} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Please copy the dag attached above and execute in airflow.
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon | 8.7.1 | Amazon integration (including 
Amazon Web Services (AWS)).
   -- | -- | --
   apache-airflow-providers-cncf-kubernetes | 7.6.0 | Kubernetes
   apache-airflow-providers-common-sql | 1.7.2 | Common SQL Provider
   apache-airflow-providers-databricks | 4.5.0 | Databricks
   apache-airflow-providers-ftp | 3.5.1 | File Transfer Protocol (FTP)
   apache-airflow-providers-google | 10.9.0 | Google services including:    - 
Google Ads   - Google Cloud (GCP)   - Google Firebase   - Google LevelDB   - 
Google Marketing Platform   - Google Workspace (formerly Google Suite)
   apache-airflow-providers-hashicorp | 3.4.3 | Hashicorp including Hashicorp 
Vault
   apache-airflow-providers-http | 4.5.1 | Hypertext Transfer Protocol (HTTP)
   apache-airflow-providers-imap | 3.3.1 | Internet Message Access Protocol 
(IMAP)
   apache-airflow-providers-mysql | 5.3.1 | MySQL
   apache-airflow-providers-postgres | 5.6.1 | PostgreSQL
   apache-airflow-providers-sftp | 4.6.1 | SSH File Transfer Protocol (SFTP)
   apache-airflow-providers-sqlite | 3.4.3 | SQLite
   apache-airflow-providers-ssh | 3.7.3 | Secure Shell (SSH)
   
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to