Robstaa opened a new issue #15547:
URL: https://github.com/apache/airflow/issues/15547


   **Apache Airflow version**:
   2.0.1
   
   **Environment**:
   - **OS** (e.g. from /etc/os-release): macOS Big Sur 11.2.3
   - **Kernel** (e.g. `uname -a`): Darwin Kernel Version 20.3.0
   
   **What happened**:
   
   When executing a `PostgresOperator` within a `PythonOperator` through the 
new Taskflow API, the jinja templating does not seem to execute. As usual, I am 
giving the `PostgresOperator` a path to the SQL file as `sql` parameter. As 
standalone task the `PostgresOperator` does what is expected. It reads the SQL 
path and then executes the SQL code it reads from the file. It also works with 
`params`.
   
   **What you expected to happen**:
   I expect that this (Jinja-)templating also works when the PostgresOperator 
is executed within a `@task` method. 
   
   **How to reproduce it**:
   1. Define a connection `"pg_connection"` to a postgres database, or use the 
default connection.
   2. Within that database create a table `test_table`
   3. In `dags/test_dag.py`:
   ```python
   from airflow.decorators import dag, task
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   from airflow.operators.python import get_current_context
   from airflow.utils.dates import days_ago
   
   
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['[email protected]'],
       'email_on_failure': True,
       'email_on_retry': False,
       'retries': 1,
       'start_date': days_ago(2)
   }
   
   @dag(default_args=DEFAULT_ARGS, schedule_interval=None)
   def test_dag():
   
       outside_pg = PostgresOperator(
           task_id='outside_pg',
           postgres_conn_id='pg_connection',
           sql='sql/test_sql.sql'
       )
   
       @task()
       def inside_pg():
           context = get_current_context()
           inside_pg = PostgresOperator(
               task_id='inside_pg',
               postgres_conn_id='pg_connection',
               sql='sql/test_sql.sql'
           )
           inside_pg.execute(context=context)
   
       execute_inside_pg = inside_pg()
       outside_pg >> execute_inside_pg
   
   dag = test_dag()
   ```
   4. In `dags/sql/test_sql.sql`:
   ```sql
   SELECT * FROM test_table;
   ```
   
   **The error logs I get**:
   ```
   airflow dags test test_dag -1
   [2021-04-27 13:01:25,488] {dagbag.py:448} INFO - Filling up the DagBag from 
/Users/robinzuschke/code/valyria/airflow/dags
   [2021-04-27 13:01:25,649] {base_executor.py:82} INFO - Adding to queue: 
['<TaskInstance: test_dag.outside_pg 2021-04-01 00:00:00+00:00 [queued]>']
   [2021-04-27 13:01:30,669] {taskinstance.py:1257} INFO - Exporting the 
following env vars:
   [email protected]
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=test_dag
   AIRFLOW_CTX_TASK_ID=outside_pg
   AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
   [2021-04-27 13:01:30,675] {base.py:74} INFO - Using connection to: id: 
pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, 
Password: None, extra: None
   [2021-04-27 13:01:30,682] {dbapi.py:180} INFO - Running statement: SELECT * 
FROM test_table;, parameters: None
   [2021-04-27 13:01:30,686] {dbapi.py:186} INFO - Rows affected: 0
   [2021-04-27 13:01:30,690] {taskinstance.py:1166} INFO - Marking task as 
SUCCESS. dag_id=test_dag, task_id=outside_pg, execution_date=20210401T000000, 
start_date=20210427T104518, end_date=20210427T110130
   [2021-04-27 13:01:30,704] {taskinstance.py:1220} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2021-04-27 13:01:30,722] {backfill_job.py:388} INFO - [backfill progress] | 
finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 
| skipped: 0 | deadlocked: 0 | not ready: 1
   [2021-04-27 13:01:30,744] {base_executor.py:82} INFO - Adding to queue: 
['<TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [queued]>']
   [2021-04-27 13:01:35,615] {taskinstance.py:1257} INFO - Exporting the 
following env vars:
   [email protected]
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=test_dag
   AIRFLOW_CTX_TASK_ID=inside_pg
   AIRFLOW_CTX_EXECUTION_DATE=2021-04-01T00:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-04-01T00:00:00+00:00
   [2021-04-27 13:01:35,619] {base.py:74} INFO - Using connection to: id: 
pg_connection. Host: localhost, Port: 5432, Schema: postgres, Login: postgres, 
Password: None, extra: None
   [2021-04-27 13:01:35,624] {dbapi.py:180} INFO - Running statement: 
sql/test_sql.sql, parameters: None
   [2021-04-27 13:01:35,625] {taskinstance.py:1455} ERROR - syntax error at or 
near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   Traceback (most recent call last):
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py",
 line 233, in execute
       return_value = self.python_callable(*self.op_args, **self.op_kwargs)
     File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, 
in inside_pg
       inside_pg.execute(context=context)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py",
 line 71, in execute
       self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py",
 line 184, in run
       cur.execute(sql_statement)
   psycopg2.errors.SyntaxError: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   [2021-04-27 13:01:35,630] {taskinstance.py:1503} INFO - Marking task as 
UP_FOR_RETRY. dag_id=test_dag, task_id=inside_pg, 
execution_date=20210401T000000, start_date=20210427T104518, 
end_date=20210427T110135
   [2021-04-27 13:01:35,643] {debug_executor.py:87} ERROR - Failed to execute 
task: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   .
   Traceback (most recent call last):
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/executors/debug_executor.py",
 line 79, in _run_task
       ti._run_raw_task(job_id=ti.job_id, **params)  # pylint: 
disable=protected-access
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/utils/session.py",
 line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/operators/python.py",
 line 233, in execute
       return_value = self.python_callable(*self.op_args, **self.op_kwargs)
     File "/Users/robinzuschke/code/valyria/airflow/dags/test_dag.py", line 34, 
in inside_pg
       inside_pg.execute(context=context)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/providers/postgres/operators/postgres.py",
 line 71, in execute
       self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
     File 
"/Users/robinzuschke/.pyenv/versions/3.7.7/envs/airflow/lib/python3.7/site-packages/airflow/hooks/dbapi.py",
 line 184, in run
       cur.execute(sql_statement)
   psycopg2.errors.SyntaxError: syntax error at or near "sql"
   LINE 1: sql/test_sql.sql
           ^
   
   [2021-04-27 13:01:35,651] {backfill_job.py:219} ERROR - Task instance 
<TaskInstance: test_dag.inside_pg 2021-04-01 00:00:00+00:00 [failed]> failed
   [2021-04-27 13:01:35,655] {dagrun.py:430} ERROR - Marking run <DagRun 
test_dag @ 2021-04-01 00:00:00+00:00: backfill__2021-04-01T00:00:00+00:00, 
externally triggered: False> failed
   [2021-04-27 13:01:35,657] {backfill_job.py:388} INFO - [backfill progress] | 
finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 | running: 0 | failed: 1 
| skipped: 0 | deadlocked: 0 | not ready: 0
   Some task instances failed:
   DAG ID    Task ID    Execution date               Try number
   --------  ---------  -------------------------  ------------
   test_dag  inside_pg  2021-04-01 00:00:00+00:00             1
   (3.7.7/envs/airflow)
   ```
   Any help is appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to