Mrhacktivist opened a new issue, #38623:
URL: https://github.com/apache/airflow/issues/38623
### Apache Airflow Provider(s)
postgres
### Versions of Apache Airflow Providers
5.5.1
### Apache Airflow version
2.6.3
### Operating System
amazon linux 2
### Deployment
Amazon (AWS) MWAA
### Deployment details
MWAA 2.6.3
### What happened
When we pass the predefined list of variables to the loop and iterate
through it and pass the iterated value to the query in a function as a table
name parameter, we can see that the query is returned in the task logs as -
code example -
```
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
#Define the function to execute SQL script using PostgresHook
def run_stage_queries(table_name, **kwargs):
print(table_name)
print(records)
load_type_res = pg_hook.get_records(
"SELECT * FROM public.test WHERE ods_table_name =
'{}'".format(table_name))
print('Executing SQL query: {}'.format(load_type_res))
print('&&&& ', load_type_res[0][0])
load_type = load_type_res[0][0]
#Define the DAG
dag = DAG(
dag_id="vaibhav",
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
max_active_runs=1,
catchup=False
)
#Fetching records within the context of the DAG
pg_hook = PostgresHook(postgres_conn_id='red')
sql_ods = "SELECT ods_table_name FROM public.test;"
records = ["vaibhav","nikhil","kunal"]
ods_table_list = []
for o_table in records:
if o_table is not None:
ods_table_list.append(o_table)
if len(ods_table_list) > 0:
init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
for tableName in ods_table_list:
load_base_stg = PythonOperator(
task_id=tableName + '_stage',
op_kwargs={'table_name': tableName},
python_callable=run_stage_queries,
provide_context=True,
dag=dag)
init_parameters >> load_base_stg
load_base_stg >> stage_Load_Success
```
As we can see in above example we have passed predefined list of varaible
and it returned below output where sql query mentioned in run_stage_queries()
method is getting printed in the logs by [sql.py
](https://github.com/apache/airflow/blob/main/airflow/providers/common/sql/hooks/sql.py)
and the method which is responsible for it is :
```
def _run_command(self, cur, sql_statement, parameters):
"""Run a statement using an already open cursor."""
if self.log_sql:
self.log.info("Running statement: %s, parameters: %s",
sql_statement, parameters)
```
Output:
```
[2024-03-26, 12:15:37 UTC] {{sql.py:374}} INFO - Running statement: SELECT *
FROM public.test WHERE ods_table_name = 'vaibhav', parameters: None
[2024-03-26, 12:15:37 UTC] {{sql.py:383}} INFO - Rows affected: 1
```
Issue -
If we get the records from the query at top level like "
sql_ods = "SELECT ods_table_name FROM public.test;"
records = pg_hook.get_records(sql_ods)" and iterate through this "records"
and store it in another variable and then gain iterate through this variable
and create a dynamic task based on the variable name, also pass this variable
name as table name to the run_stage_queries() then in that case the query is
not returned in task log. Please find the code example and output below:
code -
```
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
#Define the function to execute SQL script using PostgresHook
def run_stage_queries(table_name, **kwargs):
print(table_name)
print(records)
print(ods_table_list)
load_type_res = pg_hook.get_records(
"SELECT * FROM public.test WHERE ods_table_name =
'{}'".format(table_name))
print('Executing SQL query: {}'.format(load_type_res))
print('&&&& ', load_type_res[0][0])
load_type = load_type_res[0][0]
#Define the DAG
dag = DAG(
dag_id="vaibhavvvvv",
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
max_active_runs=1,
catchup=False
)
#Fetching records within the context of the DAG
pg_hook = PostgresHook(postgres_conn_id='red')
sql_ods = "SELECT ods_table_name FROM public.test;"
records = pg_hook.get_records(sql_ods)
ods_table_list = [item[0] for item in records]
if len(ods_table_list) > 0:
init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
for tableName in ods_table_list:
load_base_stg = PythonOperator(
task_id=tableName + '_stage',
op_kwargs={'table_name': tableName},
python_callable=run_stage_queries,
provide_context=True,
dag=dag)
init_parameters >> load_base_stg
load_base_stg >> stage_Load_Success
```
Output :
```
[2024-03-26, 12:16:30 UTC] {{standard_task_runner.py:85}} INFO - Job 52:
Subtask vaibhav_stage
[2024-03-26, 12:16:30 UTC] {{taskinstance.py:1545}} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='vaibhavvvvv'
AIRFLOW_CTX_TASK_ID='vaibhav_stage'
AIRFLOW_CTX_EXECUTION_DATE='2024-03-26T12:16:14.873032+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-26T12:16:14.873032+00:00'
[2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - vaibhav
[2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - [('vaibhav',),
('nikhil',), ('kunal',)]
[2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - ['vaibhav',
'nikhil', 'kunal']
[2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - Executing SQL
query: [('vaibhav',)]
[2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - &&&& vaibhav
[2024-03-26, 12:16:30 UTC] {{python.py:183}} INFO - Done. Returned value
was: None
```
As we can see in above output the query is not returned in the task logs.
Why the query is no returned in the logs when the dynamic list is pass?
### What you think should happen instead
When we pass the dynamic list created by executing query and iterate through
it and store it in variable, after passing the each single value from that
variable to another query which is in method as a table name, it should return
the "Running statement: "whatever the query is"" in task logs.
### How to reproduce
Pass the dynamic list as below:
```
pg_hook = PostgresHook(postgres_conn_id='red')
sql_ods = "SELECT ods_table_name FROM public.test;"
records = pg_hook.get_records(sql_ods)
ods_table_list = [item[0] for item in records]
```
dynamic task creation
```
if len(ods_table_list) > 0:
init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
for tableName in ods_table_list:
load_base_stg = PythonOperator(
task_id=tableName + '_stage',
op_kwargs={'table_name': tableName},
python_callable=run_stage_queries,
provide_context=True,
dag=dag)
init_parameters >> load_base_stg
load_base_stg >> stage_Load_Success
```
to the method -
```
def run_stage_queries(table_name, **kwargs):
print(table_name)
print(records)
print(ods_table_list)
load_type_res = pg_hook.get_records(
"SELECT * FROM public.test WHERE ods_table_name =
'{}'".format(table_name))
print('Executing SQL query: {}'.format(load_type_res))
print('&&&& ', load_type_res[0][0])
load_type = load_type_res[0][0]
```
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]