josh-fell opened a new issue, #24388:
URL: https://github.com/apache/airflow/issues/24388
### Apache Airflow version
2.3.2 (latest released)
### What happened
When attempting to generate mapped SQL tasks using a Jinja-templated query,
an exception like the following is thrown:
`jinja2.exceptions.UndefinedError:
'airflow.models.mappedoperator.MappedOperator object' has no attribute
'<operator attribute>'`
For example, when attempting to map `SQLValueCheckOperator` tasks with
respect to `database` using a query of `SELECT COUNT(*) FROM {{ task.database
}}.tbl;`:
`jinja2.exceptions.UndefinedError:
'airflow.models.mappedoperator.MappedOperator object' has no attribute
'database'`
Or, when using `SnowflakeOperator` and mapping via `parameters` of a query
like `SELECT * FROM {{ task.parameters.tbl }};`:
`jinja2.exceptions.UndefinedError:
'airflow.models.mappedoperator.MappedOperator object' has no attribute
'parameters'`
### What you think should happen instead
When using Jinja-template SQL queries, the attribute that is being using for
the mapping should be accessible via `{{ task.<operator attribute>. }}`.
Executing the same SQL query with classic, non-mapped tasks allows for this
operator attr access from the `task` context object. Ideally, the same
interface should apply for both non-mapped and mapped tasks.
### How to reproduce
Consider the following DAG:
```python
from pendulum import datetime
from airflow.decorators import dag
from airflow.operators.sql import SQLValueCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
CORE_SQL = "SELECT COUNT(*) FROM {{ task.database }}.tbl;"
SNOWFLAKE_SQL = """SELECT * FROM {{ task.parameters.tbl }};"""
@dag(dag_id="map-city", start_date=datetime(2022, 6, 7),
schedule_interval=None)
def map_city():
classic_sql_value_check = SQLValueCheckOperator(
task_id="classic_sql_value_check",
conn_id="snowflake",
sql=CORE_SQL,
database="dev",
pass_value=20000,
)
mapped_value_check = SQLValueCheckOperator.partial(
task_id="check_row_count",
conn_id="snowflake",
sql=CORE_SQL,
pass_value=20000,
).expand(database=["dev", "production"])
classic_snowflake_task = SnowflakeOperator(
task_id="classic_snowflake_task",
snowflake_conn_id="snowflake",
sql=SNOWFLAKE_SQL,
parameters={"tbl": "foo"},
)
mapped_snowflake_task = SnowflakeOperator.partial(
task_id="mapped_snowflake_task", snowflake_conn_id="snowflake",
sql=SNOWFLAKE_SQL
).expand(
parameters=[
{"tbl": "foo"},
{"tbl": "bar"},
]
)
_ = map_city()
```
**`SQLValueCheckOperator` tasks**
The logs for the "classic_sql_value_check", non-mapped task show the query
executing as expected:
`[2022-06-11, 02:01:03 UTC] {sql.py:204} INFO - Executing SQL check: SELECT
COUNT(*) FROM dev.tbl;`
while the mapped "check_row_count" task fails with the following exception:
```bash
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running:
['airflow', 'tasks', 'run', 'map-city', 'check_row_count',
'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '350', '--raw',
'--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmpm5bg9mt5',
'--map-index', '0', '--error-file', '/tmp/tmp2kbilt2l']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 350:
Subtask check_row_count
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running
<TaskInstance: map-city.check_row_count
manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host
569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1451, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1555, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
2212, in render_templates
rendered_task = self.task.render_template_fields(context)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line
726, in render_template_fields
self._do_render_template_fields(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py",
line 344, in _do_render_template_fields
rendered_content = self.render_template(
File
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py",
line 391, in render_template
return render_template_to_string(template, context)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py",
line 296, in render_template_to_string
return render_template(template, context, native=False)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py",
line 291, in render_template
return "".join(nodes)
File "<template>", line 13, in root
File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903,
in _fail_with_undefined_error
raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError:
'airflow.models.mappedoperator.MappedOperator object' has no attribute
'database'
```
**`SnowflakeOperator` tasks**
Similarly, the "classic_snowflake_task" non-mapped task is able to execute
the SQL query as expected:
`[2022-06-11, 02:01:04 UTC] {snowflake.py:324} INFO - Running statement:
SELECT * FROM foo;, parameters: {'tbl': 'foo'}`
while the mapped "mapped_snowflake_task task fails to execute the query:
```bash
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running:
['airflow', 'tasks', 'run', 'map-city', 'mapped_snowflake_task',
'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '347', '--raw',
'--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmp6kmqs5ew',
'--map-index', '0', '--error-file', '/tmp/tmpkufg9xqx']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 347:
Subtask mapped_snowflake_task
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running
<TaskInstance: map-city.mapped_snowflake_task
manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host
569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1451, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
1555, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line
2212, in render_templates
rendered_task = self.task.render_template_fields(context)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line
726, in render_template_fields
self._do_render_template_fields(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py",
line 68, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py",
line 344, in _do_render_template_fields
rendered_content = self.render_template(
File
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py",
line 391, in render_template
return render_template_to_string(template, context)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py",
line 296, in render_template_to_string
return render_template(template, context, native=False)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py",
line 291, in render_template
return "".join(nodes)
File "<template>", line 13, in root
File "/usr/local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326,
in getattr
value = getattr(obj, attribute)
File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 910,
in __getattr__
return self._fail_with_undefined_error()
File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903,
in _fail_with_undefined_error
raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError:
'airflow.models.mappedoperator.MappedOperator object' has no attribute
'parameters'
```
### Operating System
Debian GNU/Linux 10 (buster)
### Versions of Apache Airflow Providers
apache-airflow-providers-snowflake==2.7.0
### Deployment
Astronomer
### Deployment details
Astronomer Runtime 5.0.3
### Anything else
Even though using the `{{ task.<operator attr> }}` method does not work for
mapped tasks, there is a workaround. Given the `SnowflakeOperator` example from
above attempting to execute the query: `SELECT * FROM {{ task.parameters.tbl
}};`, users can modify the query to `SELECT * FROM {{
task.mapped_kwargs.parameters[ti.map_index].tbl }};` for successful execution.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]