aran3 opened a new issue #19204:
URL: https://github.com/apache/airflow/issues/19204


   ### Apache Airflow version
   
   2.2.0 (latest released)
   
   ### Operating System
   
   Red Hat Enterprise Linux Server 7.6 (Maipo)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-postgres==2.3.0
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   `psycopg2.errors.SyntaxError` raised when using a custom "external task 
sensor" with the following code:
   
   ```python
           TI = TaskInstance
           DR = DagRun
           if self.external_task_id:
               last_instance = session.query(TI).filter(
                   TI.dag_id == self.external_dag_id,
                   TI.task_id == self.external_task_id,
                   TI.execution_date.in_(dttm_filter)
               ).order_by(TI.execution_date.desc()).first()
           else:
               last_instance = session.query(DR).filter(
                   DR.dag_id == self.dag_id,
                   DR.execution_date.in_(dttm_filter)
               ).order_by(DR.execution_date.desc()).first()
           return last_instance.state
   ```
   
   This code was a modified from 
https://github.com/apache/airflow/blob/2.2.0/airflow/sensors/external_task.py#L231
 and worked on airflow 1.10.7 - 2.1.4. 
   
   Error details: 
   ```
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.SyntaxError) syntax error 
at or near "DESC"
   LINE 7: ..._id = task_instance.run_id AND dag_run.execution_date DESC) 
                                                                    ^
   
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.run_id AS task_instance_run_id, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.max_tries AS 
task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
task_instance_job_id, task_instance.pool AS task_instance_pool, 
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
task_instance_queue, task_instance.priority_weight AS 
task_instance_priority_weight, task_instance.operator AS 
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_job_id AS task_instance_queued_by_job_id, 
task_instance.pi
 d AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id, task_instance.trigger_id AS 
task_instance_trigger_id, task_instance.trigger_timeout AS 
task_instance_trigger_timeout, task_instance.next_method AS 
task_instance_next_method, task_instance.next_kwargs AS 
task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS 
dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS 
dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, 
dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS 
dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, 
dag_run_1.creating_job_id AS dag_run_1_creating_job_id, 
dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS 
dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, 
dag_run_1.data_interval_start AS dag_run_1_data_interval_start, 
dag_run_1.data_interval
 _end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS 
dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash 
   FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = 
task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id 
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = 
%(task_id_1)s AND (EXISTS (SELECT 1 
   FROM dag_run 
   WHERE dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = 
task_instance.run_id AND dag_run.execution_date IN (%(execution_date_1)s))) 
ORDER BY EXISTS (SELECT 1 
   FROM dag_run 
   WHERE dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = 
task_instance.run_id AND dag_run.execution_date DESC) 
    LIMIT %(param_1)s]
   ```
   
   ### What you expected to happen
   
   Either the query should work (best), or `TI.execution_date` should be more 
strictly-checked and documented to only be usable for some actions. 
   
   Although a deprecation waring is raised when accessing `TI.execution_date`, 
the ExternalTaskSensor uses it and it is a part of the model of TaskInstance. 
   
   ### How to reproduce
   
   Reproduces consistently.  
   See code in "what happened" can provide full sensor code if needed. 
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to