gecko655 opened a new issue, #32804:
URL: https://github.com/apache/airflow/issues/32804
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
### Airflow version
2.5.3+composer (The latest available airflow version in Cloud Composer)
### What happened
`DagRun.find()` (the `find` method of SQLAlchemy model `DagRun` ) fails with
the error message `TypeError: 'DateTime' object is not iterable`, when passing
`execution_date` that is directly obtained from context:
```py
def delete_previous_dagrun_func(dag_to_delete, **context):
execution_date = context.get('execution_date')
dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
execution_date=execution_date)
```
### What is the cause
Upon closer inspection, execution_date is of type `lazy_object_proxy.Proxy`
and the `is_container()` function used in `DagRun.find()` determines if the
variable is a "container" by the presence of the `__iter__` field.
`lazy_object_proxy.Proxy` has `__iter__`, so the `execution_date` is
determined to be a container, and as a result it is passed as an array element
to SQLAlchemy, which caused the "not iterable" error.
https://github.com/apache/airflow/blob/6313e5293280773aed7598e1befb8d371e8f5614/airflow/models/dagrun.py#L406-L409
https://github.com/apache/airflow/blob/6313e5293280773aed7598e1befb8d371e8f5614/airflow/utils/helpers.py#L117-L120
I think `is_container()` should have another conditional branch to deal with
`lazy_object_proxy.Proxy`.
### workaround
It works fine by unwrapping `lazy_object_proxy.Proxy` before passing it.
```py
def delete_previous_dagrun_func(dag_to_delete, **context):
execution_date = context.get('execution_date')
dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
execution_date=execution_date.__wrapped__)
```
https://github.com/ionelmc/python-lazy-object-proxy/blob/c56c68bda23b8957abbc2fef3d21f32dd44b7f93/src/lazy_object_proxy/simple.py#L76-L83
### What you think should happen instead
The variable fetched from `context.get('execution_date')` can be used
directly as an argument of `DagRun.find()`
### How to reproduce
```py
dag_to_delete = DAG('DAG_TO_DELETE')
dag = DAG('DAG')
def delete_previous_dagrun_func(dag_to_delete, **context):
execution_date = context.get('execution_date')
dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
execution_date=execution_date)
# do something for `dagruns_today` (Delete the dagruns for today)
op = PythonOperator(
task_id='Delete_Previous_DagRun',
python_callable=delete_previous_dagrun_func,
op_args=[dag_to_delete],
provide_context=True,
dag=dag)
```
This code produces the following error at `DagRun.find` line:
```
[2023-07-24, 19:09:06 JST] {taskinstance.py:1778} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line
210, in execute
branch = super().execute(context)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line
175, in execute
return_value = self.execute_callable()
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line
192, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/*******.py", line ***, in
delete_previous_dagrun_func
dagruns_today = DagRun.find(dag_id=dag_to_delete.dag_id,
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 75,
in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/dagrun.py", line
386, in find
qry = qry.filter(cls.execution_date.in_(execution_date))
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
641, in in_
return self.operate(in_op, other)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line
317, in operate
return op(self.comparator, *other, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
1423, in in_op
return a.in_(b)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
641, in in_
return self.operate(in_op, other)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/properties.py", line
426, in operate
return op(self.__clause_element__(), *other, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
1423, in in_op
return a.in_(b)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
641, in in_
return self.operate(in_op, other)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line
870, in operate
return op(self.comparator, *other, **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
1423, in in_op
return a.in_(b)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line
641, in in_
return self.operate(in_op, other)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/type_api.py", line
1373, in operate
return super(TypeDecorator.Comparator, self).operate(
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/type_api.py", line
77, in operate
return o[0](self.expr, op, *(other + o[1:]), **kwargs)
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/default_comparator.py",
line 159, in _in_impl
seq_or_selectable = coercions.expect(
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py", line
193, in expect
resolved = impl._literal_coercion(
File
"/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py", line
573, in _literal_coercion
element = list(element)
TypeError: 'DateTime' object is not iterable
```
### Operating System
Google Cloud Composer (Ubuntu 20.04.6 LTS on Kubernetes)
### Versions of Apache Airflow Providers
No relevant providers
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Google Cloud Composer image version: `composer-2.3.4-airflow-2.5.3`
### Anything else
I have recently begun preparing to upgrade Airflow from 1.10 to Series 2.x.
The code described in the reproduce section still works in Airflow 1.10
environment.
I want to know it is intentional OR accidental incompatibility between 1.10
and 2.x.
I am willing to submit a PR, if it is super easy (like changing 1 line of
code).
If it is not the case and I need to make a big change, I think I do not have
enough time to make PR timely.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]