josh-fell opened a new issue, #24388:
URL: https://github.com/apache/airflow/issues/24388

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   When attempting to generate mapped SQL tasks using a Jinja-templated query, 
an exception like the following is thrown:
   
   `jinja2.exceptions.UndefinedError: 
'airflow.models.mappedoperator.MappedOperator object' has no attribute 
'<operator attribute>'`
   
   For example, when attempting to map `SQLValueCheckOperator` tasks with 
respect to `database` using a query of `SELECT COUNT(*) FROM {{ task.database 
}}.tbl;`:
   `jinja2.exceptions.UndefinedError: 
'airflow.models.mappedoperator.MappedOperator object' has no attribute 
'database'`
   
   Or, when using `SnowflakeOperator` and mapping via `parameters` of a query 
like `SELECT * FROM {{ task.parameters.tbl }};`:
   `jinja2.exceptions.UndefinedError: 
'airflow.models.mappedoperator.MappedOperator object' has no attribute 
'parameters'`
   
   ### What you think should happen instead
   
   When using Jinja-template SQL queries, the attribute that is being using for 
the mapping should be accessible via `{{ task.<operator attribute>. }}`. 
Executing the same SQL query with classic, non-mapped tasks allows for this 
operator attr access from the `task` context object. Ideally, the same 
interface should apply for both non-mapped and mapped tasks.
   
   ### How to reproduce
   
   Consider the following DAG:
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.operators.sql import SQLValueCheckOperator
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   
   
   CORE_SQL = "SELECT COUNT(*) FROM {{ task.database }}.tbl;"
   SNOWFLAKE_SQL = """SELECT * FROM {{ task.parameters.tbl }};"""
   
   
   @dag(dag_id="map-city", start_date=datetime(2022, 6, 7), 
schedule_interval=None)
   def map_city():
           classic_sql_value_check = SQLValueCheckOperator(
           task_id="classic_sql_value_check",
           conn_id="snowflake",
           sql=CORE_SQL,
           database="dev",
           pass_value=20000,
       )
   
       mapped_value_check = SQLValueCheckOperator.partial(
           task_id="check_row_count",
           conn_id="snowflake",
           sql=CORE_SQL,
           pass_value=20000,
       ).expand(database=["dev", "production"])
   
       classic_snowflake_task = SnowflakeOperator(
           task_id="classic_snowflake_task",
           snowflake_conn_id="snowflake",
           sql=SNOWFLAKE_SQL,
           parameters={"tbl": "foo"},
       )
   
       mapped_snowflake_task = SnowflakeOperator.partial(
           task_id="mapped_snowflake_task", snowflake_conn_id="snowflake", 
sql=SNOWFLAKE_SQL
       ).expand(
           parameters=[
               {"tbl": "foo"},
               {"tbl": "bar"},
           ]
       )
   
   
   _ = map_city()
   ```
   
   **`SQLValueCheckOperator` tasks**
   The logs for the "classic_sql_value_check", non-mapped task show the query 
executing as expected:
    `[2022-06-11, 02:01:03 UTC] {sql.py:204} INFO - Executing SQL check: SELECT 
COUNT(*) FROM dev.tbl;`
   while the mapped "check_row_count" task fails with the following exception:
   ```bash
   [2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: 
['airflow', 'tasks', 'run', 'map-city', 'check_row_count', 
'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '350', '--raw', 
'--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmpm5bg9mt5', 
'--map-index', '0', '--error-file', '/tmp/tmp2kbilt2l']
   [2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 350: 
Subtask check_row_count
   [2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running 
<TaskInstance: map-city.check_row_count 
manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 
569596df5be5
   [2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1451, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1555, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
2212, in render_templates
       rendered_task = self.task.render_template_fields(context)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 
726, in render_template_fields
       self._do_render_template_fields(
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", 
line 344, in _do_render_template_fields
       rendered_content = self.render_template(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", 
line 391, in render_template
       return render_template_to_string(template, context)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", 
line 296, in render_template_to_string
       return render_template(template, context, native=False)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", 
line 291, in render_template
       return "".join(nodes)
     File "<template>", line 13, in root
     File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, 
in _fail_with_undefined_error
       raise self._undefined_exception(self._undefined_message)
   jinja2.exceptions.UndefinedError: 
'airflow.models.mappedoperator.MappedOperator object' has no attribute 
'database'
   ```
   
   **`SnowflakeOperator` tasks**
   Similarly, the "classic_snowflake_task" non-mapped task is able to execute 
the SQL query as expected:
   `[2022-06-11, 02:01:04 UTC] {snowflake.py:324} INFO - Running statement: 
SELECT * FROM foo;, parameters: {'tbl': 'foo'}`
   while the mapped "mapped_snowflake_task task fails to execute the query:
   ```bash
   [2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: 
['airflow', 'tasks', 'run', 'map-city', 'mapped_snowflake_task', 
'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '347', '--raw', 
'--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmp6kmqs5ew', 
'--map-index', '0', '--error-file', '/tmp/tmpkufg9xqx']
   [2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 347: 
Subtask mapped_snowflake_task
   [2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running 
<TaskInstance: map-city.mapped_snowflake_task 
manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 
569596df5be5
   [2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1451, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
1555, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
2212, in render_templates
       rendered_task = self.task.render_template_fields(context)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 
726, in render_template_fields
       self._do_render_template_fields(
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", 
line 344, in _do_render_template_fields
       rendered_content = self.render_template(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", 
line 391, in render_template
       return render_template_to_string(template, context)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", 
line 296, in render_template_to_string
       return render_template(template, context, native=False)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", 
line 291, in render_template
       return "".join(nodes)
     File "<template>", line 13, in root
     File "/usr/local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326, 
in getattr
       value = getattr(obj, attribute)
     File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 910, 
in __getattr__
       return self._fail_with_undefined_error()
     File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, 
in _fail_with_undefined_error
       raise self._undefined_exception(self._undefined_message)
   jinja2.exceptions.UndefinedError: 
'airflow.models.mappedoperator.MappedOperator object' has no attribute 
'parameters'
   ```
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-snowflake==2.7.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Astronomer Runtime 5.0.3
   
   ### Anything else
   
   Even though using the `{{ task.<operator attr> }}` method does not work for 
mapped tasks, there is a workaround. Given the `SnowflakeOperator` example from 
above attempting to execute the query: `SELECT * FROM {{ task.parameters.tbl 
}};`, users can modify the query to `SELECT * FROM {{ 
task.mapped_kwargs.parameters[ti.map_index].tbl }};` for successful execution.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to