gnthibault opened a new issue, #41821:
URL: https://github.com/apache/airflow/issues/41821

   ### Apache Airflow version
   
   2.9.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.9.1
   
   ### What happened?
   
   I got the following error:
   [2024-08-28, 11:52:27 CEST] {taskinstance.py:2907} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1910, in _execute_context
       self.dialect.do_execute(
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", 
line 736, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type 
integer: "manual__2024-08-28T09:49:23.581417+00:00"
   LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
                              ^
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", 
line 465, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", 
line 432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", 
line 400, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", 
line 269, in execute
       return self.do_branch(context, super().execute(context))
                                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", 
line 400, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", 
line 235, in execute
       return_value = self.execute_callable()
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", 
line 252, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/gcs/dags/sap_ecc_incremental.py", line 210, in 
get_branch
       prev_dag_run = dag_run.get_previous_scheduled_dagrun(dag_run.run_id)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py",
 line 115, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line 
79, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/dagrun.py", line 
726, in get_previous_scheduled_dagrun
       dag_run = session.get(DagRun, dag_run_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 
2853, in get
       return self._get_impl(
              ^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 
2975, in _get_impl
       return db_load_fn(
              ^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 
530, in load_on_pk_identity
       session.execute(
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 
1717, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 
334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
2134, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 
211, in raise_
       raise exception
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1910, in _execute_context
       self.dialect.do_execute(
     File 
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py", 
line 736, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) 
invalid input syntax for type integer: 
"manual__2024-08-28T09:49:23.581417+00:00"
   LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
                              ^
   
   [SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, 
dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at, 
dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS 
dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS 
dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id, 
dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS 
dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS 
dag_run_data_interval_start, dag_run.data_interval_end AS 
dag_run_data_interval_end, dag_run.last_scheduling_decision AS 
dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash, 
dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS 
dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number 
   FROM dag_run 
   WHERE dag_run.id = %(pk_1)s]
   [parameters: {'pk_1': 'manual__2024-08-28T09:49:23.581417+00:00'}]
   (Background on this error at: https://sqlalche.me/e/14/9h9h)
   [2024-08-28, 11:52:27 CEST] {taskinstance.py:1206} INFO - Marking task as 
FAILED. dag_id=sap_ecc_incremental, task_id=datetime_branch, 
run_id=manual__2024-08-28T09:49:23.581417+00:00, 
execution_date=20240828T094923, start_date=20240828T095226, 
end_date=20240828T095227
   [2024-08-28, 11:52:27 CEST] {standard_task_runner.py:110} ERROR - Failed to 
execute job 1854555 for task datetime_branch 
((psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type 
integer: "manual__2024-08-28T09:49:23.581417+00:00"
   LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
                              ^
   
   [SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, 
dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at, 
dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS 
dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS 
dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id, 
dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS 
dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS 
dag_run_data_interval_start, dag_run.data_interval_end AS 
dag_run_data_interval_end, dag_run.last_scheduling_decision AS 
dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash, 
dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS 
dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number 
   FROM dag_run 
   WHERE dag_run.id = %(pk_1)s]
   [parameters: {'pk_1': 'manual__2024-08-28T09:49:23.581417+00:00'}]
   (Background on this error at: https://sqlalche.me/e/14/9h9h); 36085)
   [2024-08-28, 11:52:27 CEST] {local_task_job_runner.py:240} INFO - Task 
exited with return code 1
   [2024-08-28, 11:52:27 CEST] {local_task_job_runner.py:222} ▲▲▲ Log group end
   
   
   Version: 2.9.1+composer
   
   ### What you think should happen instead?
   
   I think the issue comes from this line:
   
https://github.com/apache/airflow/blob/9f30a41874454696ae2b215b2d86cb9a62968006/airflow/models/dagrun.py#L711
   Type is int, should be str
   
   ### How to reproduce
   
   # Airflow specifics
   from airflow.models import DAG
   from airflow.operators.python import BranchPythonOperator
   
   dag = DAG(
       dag_id=f'dag')
   
   conditional_datetime_check = BranchPythonOperator(
       task_id='my_branch',
       python_callable=get_branch,
       provide_context=True,
       dag=dag
       )
   
   def get_branch(**kwargs):
       dag_run = kwargs['dag_run']
       prev_dag_run = dag_run.get_previous_scheduled_dagrun(dag_run.run_id)
   
   
   ### Operating System
   
   linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to