gnthibault opened a new issue, #41821:
URL: https://github.com/apache/airflow/issues/41821
### Apache Airflow version
2.9.3
### If "Other Airflow 2 version" selected, which one?
2.9.1
### What happened?
I got the following error:
[2024-08-28, 11:52:27 CEST] {taskinstance.py:2907} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
1910, in _execute_context
self.dialect.do_execute(
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type
integer: "manual__2024-08-28T09:49:23.581417+00:00"
LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py",
line 269, in execute
return self.do_branch(context, super().execute(context))
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 400, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py",
line 235, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py",
line 252, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/gcs/dags/sap_ecc_incremental.py", line 210, in
get_branch
prev_dag_run = dag_run.get_previous_scheduled_dagrun(dag_run.run_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py",
line 115, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/utils/session.py", line
79, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/dagrun.py", line
726, in get_previous_scheduled_dagrun
dag_run = session.get(DagRun, dag_run_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line
2853, in get
return self._get_impl(
^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line
2975, in _get_impl
return db_load_fn(
^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line
530, in load_on_pk_identity
session.execute(
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line
1717, in execute
result = conn._execute_20(statement, params or {}, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line
334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
1953, in _execute_context
self._handle_dbapi_exception(
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
2134, in _handle_dbapi_exception
util.raise_(
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line
211, in raise_
raise exception
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line
1910, in _execute_context
self.dialect.do_execute(
File
"/opt/python3.11/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation)
invalid input syntax for type integer:
"manual__2024-08-28T09:49:23.581417+00:00"
LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
^
[SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id,
dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at,
dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS
dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS
dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id,
dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS
dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS
dag_run_data_interval_start, dag_run.data_interval_end AS
dag_run_data_interval_end, dag_run.last_scheduling_decision AS
dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash,
dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS
dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number
FROM dag_run
WHERE dag_run.id = %(pk_1)s]
[parameters: {'pk_1': 'manual__2024-08-28T09:49:23.581417+00:00'}]
(Background on this error at: https://sqlalche.me/e/14/9h9h)
[2024-08-28, 11:52:27 CEST] {taskinstance.py:1206} INFO - Marking task as
FAILED. dag_id=sap_ecc_incremental, task_id=datetime_branch,
run_id=manual__2024-08-28T09:49:23.581417+00:00,
execution_date=20240828T094923, start_date=20240828T095226,
end_date=20240828T095227
[2024-08-28, 11:52:27 CEST] {standard_task_runner.py:110} ERROR - Failed to
execute job 1854555 for task datetime_branch
((psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type
integer: "manual__2024-08-28T09:49:23.581417+00:00"
LINE 3: WHERE dag_run.id = 'manual__2024-08-28T09:49:23.581417+00:00...
^
[SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id,
dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at,
dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS
dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS
dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id,
dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS
dag_run_run_type, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS
dag_run_data_interval_start, dag_run.data_interval_end AS
dag_run_data_interval_end, dag_run.last_scheduling_decision AS
dag_run_last_scheduling_decision, dag_run.dag_hash AS dag_run_dag_hash,
dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS
dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number
FROM dag_run
WHERE dag_run.id = %(pk_1)s]
[parameters: {'pk_1': 'manual__2024-08-28T09:49:23.581417+00:00'}]
(Background on this error at: https://sqlalche.me/e/14/9h9h); 36085)
[2024-08-28, 11:52:27 CEST] {local_task_job_runner.py:240} INFO - Task
exited with return code 1
[2024-08-28, 11:52:27 CEST] {local_task_job_runner.py:222} ▲▲▲ Log group end
Version: 2.9.1+composer
### What you think should happen instead?
I think the issue comes from this line:
https://github.com/apache/airflow/blob/9f30a41874454696ae2b215b2d86cb9a62968006/airflow/models/dagrun.py#L711
Type is int, should be str
### How to reproduce
# Airflow specifics
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator
dag = DAG(
dag_id=f'dag')
conditional_datetime_check = BranchPythonOperator(
task_id='my_branch',
python_callable=get_branch,
provide_context=True,
dag=dag
)
def get_branch(**kwargs):
dag_run = kwargs['dag_run']
prev_dag_run = dag_run.get_previous_scheduled_dagrun(dag_run.run_id)
### Operating System
linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Google Cloud Composer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]