tirkarthi commented on code in PR #28256:
URL: https://github.com/apache/airflow/pull/28256#discussion_r1112980514
##########
airflow/dag_processing/manager.py:
##########
@@ -782,7 +782,11 @@ def clear_nonexistent_import_errors(file_paths: list[str]
| None, session=NEW_SE
"""
query = session.query(errors.ImportError)
if file_paths:
- query = query.filter(~errors.ImportError.filename.in_(file_paths))
+ for file_path in file_paths:
+ if file_path.endswith(".zip"):
+ query =
query.filter(~(errors.ImportError.filename.startswith(file_path)))
+ else:
+ query = query.filter(errors.ImportError.filename !=
file_path)
Review Comment:
Thanks @potiuk for the links. I tested the query with a for loop and it
seems to be increasing based on the where clause though the "in" query is
faster. This effect might be the same when large zip files are present where
startswith is used. An index seems to help here but still the "in" query seems
constant time irrespective of the length of the files and it might involve
creating a separate column in the import_error table as per the initial
suggestion.
Note that the import_error contains only entry and the results are for
postgresql 15 on ubuntu.
```python
In [13]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(1_000):
...: query = query.filter(errors.ImportError.filename != str(i))
...: print(query.count())
...:
...:
1
CPU times: user 65.7 ms, sys: 323 µs, total: 66 ms
Wall time: 75 ms
In [14]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(10_000):
...: query = query.filter(errors.ImportError.filename != str(i))
...: print(query.count())
...:
...:
1
CPU times: user 1.02 s, sys: 0 ns, total: 1.02 s
Wall time: 1.1 s
In [16]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: filenames = [str(i) for i in range(10_000)]
...: query.filter(~errors.ImportError.filename.in_(filenames))
...: print(query.count())
...:
...:
...:
1
CPU times: user 19.8 ms, sys: 404 µs, total: 20.2 ms
Wall time: 20.7 ms
In [18]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(10_000):
...: query =
query.filter(~(errors.ImportError.filename.startswith(str(i))))
...: print(query.count())
...:
...:
1
CPU times: user 2 s, sys: 11.5 ms, total: 2.01 s
Wall time: 2.14 s
```
After index `CREATE UNIQUE INDEX filename_idx ON import_error (filename);`
```python
In [21]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(1_000):
...: query = query.filter(errors.ImportError.filename != str(i))
...: print(query.count())
...:
...:
1
CPU times: user 50.1 ms, sys: 91 µs, total: 50.1 ms
Wall time: 61.4 ms
In [22]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(10_000):
...: query = query.filter(errors.ImportError.filename != str(i))
...: print(query.count())
...:
...:
1
CPU times: user 599 ms, sys: 3.87 ms, total: 603 ms
Wall time: 699 ms
In [23]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: filenames = [str(i) for i in range(10_000)]
...: query.filter(~errors.ImportError.filename.in_(filenames))
...: print(query.count())
...:
...:
...:
1
CPU times: user 18.1 ms, sys: 0 ns, total: 18.1 ms
Wall time: 18.8 ms
In [24]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(10_000):
...: query =
query.filter(~(errors.ImportError.filename.startswith(str(i))))
...: print(query.count())
...:
...:
1
CPU times: user 1.33 s, sys: 12 ms, total: 1.34 s
Wall time: 1.48 s
```
After index to help text search `CREATE UNIQUE INDEX filename_idx ON
import_error (filename text_pattern_ops);`
https://stackoverflow.com/questions/31391101/postgresql-index-for-like-abc-searching
```python
In [26]: %%time
...: with create_session() as session:
...: query = session.query(errors.ImportError)
...: for i in range(10_000):
...: query =
query.filter(~(errors.ImportError.filename.startswith(str(i))))
...: print(query.count())
...:
...:
1
CPU times: user 835 ms, sys: 76 µs, total: 835 ms
Wall time: 996 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]