magdagultekin opened a new issue, #25815:
URL: https://github.com/apache/airflow/issues/25815

   ### Apache Airflow version
   
   2.3.3
   
   ### What happened
   
   `SQLTableCheckOperator` fails when used with Postgres.
   
   ### What you think should happen instead
   
   From the logs:
   ```
   [2022-08-19, 09:28:14 UTC] {taskinstance.py:1910} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py",
 line 296, in execute
       records = hook.get_first(self.sql)
     File "/usr/local/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 
178, in get_first
       cur.execute(sql)
   psycopg2.errors.SyntaxError: subquery in FROM must have an alias
   LINE 1: SELECT MIN(row_count_check) FROM (SELECT CASE WHEN COUNT(*) ...
                                            ^
   HINT:  For example, FROM (SELECT ...) [AS] foo.
   ```
   
   ### How to reproduce
   
   ```python
   import pendulum
   from datetime import timedelta
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.providers.common.sql.operators.sql import 
SQLColumnCheckOperator, SQLTableCheckOperator
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   from airflow.providers.postgres.hooks.postgres import PostgresHook
   
   _POSTGRES_CONN = "postgresdb"
   _TABLE_NAME = "employees"
   
   default_args = {
       "owner": "cs",
       "retries": 3,
       "retry_delay": timedelta(seconds=15),
       }
   
   with DAG(
       dag_id="sql_data_quality",
       start_date=pendulum.datetime(2022, 8, 1, tz="UTC"),
       schedule_interval=None,
   ) as dag:
   
       create_table = PostgresOperator(
           task_id="create_table",
           postgres_conn_id=_POSTGRES_CONN,
           sql=f"""
           CREATE TABLE IF NOT EXISTS {_TABLE_NAME} (
               employee_name VARCHAR NOT NULL,
               employment_year INT NOT NULL
           );
           """
       )
   
       populate_data = PostgresOperator(
           task_id="populate_data",
           postgres_conn_id=_POSTGRES_CONN,
           sql=f"""
               INSERT INTO {_TABLE_NAME} VALUES ('Adam', 2021);
               INSERT INTO {_TABLE_NAME} VALUES ('Chris', 2021);
               INSERT INTO {_TABLE_NAME} VALUES ('Frank', 2021);
               INSERT INTO {_TABLE_NAME} VALUES ('Fritz', 2021);
               INSERT INTO {_TABLE_NAME} VALUES ('Magda', 2022);
               INSERT INTO {_TABLE_NAME} VALUES ('Phil', 2021);
           """,
       )
   
       check_row_count = SQLTableCheckOperator(
           task_id="check_row_count",
           conn_id=_POSTGRES_CONN,
           table=_TABLE_NAME,
           checks={
               "row_count_check": {"check_statement": "COUNT(*) >= 3"}
           },
       )
   
       drop_table = PostgresOperator(
           task_id="drop_table",
           trigger_rule="all_done",
           postgres_conn_id=_POSTGRES_CONN,
           sql="""
               DROP TABLE employees;
           """,
       )
   
       create_table >> populate_data >> check_row_count >> drop_table
   ```
   
   ### Operating System
   
   macOS
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-common-sql==1.0.0`
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to