tirkarthi commented on code in PR #28256:
URL: https://github.com/apache/airflow/pull/28256#discussion_r1112980514


##########
airflow/dag_processing/manager.py:
##########
@@ -782,7 +782,11 @@ def clear_nonexistent_import_errors(file_paths: list[str] 
| None, session=NEW_SE
         """
         query = session.query(errors.ImportError)
         if file_paths:
-            query = query.filter(~errors.ImportError.filename.in_(file_paths))
+            for file_path in file_paths:
+                if file_path.endswith(".zip"):
+                    query = 
query.filter(~(errors.ImportError.filename.startswith(file_path)))
+                else:
+                    query = query.filter(errors.ImportError.filename != 
file_path)

Review Comment:
   Thanks @potiuk for the links. I tested the query with a for loop and it 
seems to be increasing based on the where clause though the "in" query is 
faster. This effect might be the same when large zip files are present where 
startswith is used. An index seems to help here but still the "in" query seems 
constant time irrespective of the length of the files and it might involve 
creating a separate column in the import_error table as per the initial 
suggestion.
   
   Note that the import_error contains only entry and the results are for 
postgresql 15 on ubuntu.
   
   ```python
   In [13]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(1_000):
       ...:         query = query.filter(errors.ImportError.filename != str(i))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 65.7 ms, sys: 323 µs, total: 66 ms
   Wall time: 75 ms
   
   In [14]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(10_000):
       ...:         query = query.filter(errors.ImportError.filename != str(i))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 1.02 s, sys: 0 ns, total: 1.02 s
   Wall time: 1.1 s
   
   In [16]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     filenames = [str(i) for i in range(10_000)]
       ...:     query.filter(~errors.ImportError.filename.in_(filenames))
       ...:     print(query.count())
       ...: 
       ...: 
       ...: 
   1
   CPU times: user 19.8 ms, sys: 404 µs, total: 20.2 ms
   Wall time: 20.7 ms
   
   In [18]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(10_000):
       ...:         query = 
query.filter(~(errors.ImportError.filename.startswith(str(i))))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 2 s, sys: 11.5 ms, total: 2.01 s
   Wall time: 2.14 s
   ```
   
   After index `CREATE UNIQUE INDEX filename_idx ON import_error (filename);`
   
   ```python
   In [21]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(1_000):
       ...:         query = query.filter(errors.ImportError.filename != str(i))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 50.1 ms, sys: 91 µs, total: 50.1 ms
   Wall time: 61.4 ms
   
   In [22]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(10_000):
       ...:         query = query.filter(errors.ImportError.filename != str(i))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 599 ms, sys: 3.87 ms, total: 603 ms
   Wall time: 699 ms
   
   In [23]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     filenames = [str(i) for i in range(10_000)]
       ...:     query.filter(~errors.ImportError.filename.in_(filenames))
       ...:     print(query.count())
       ...: 
       ...: 
       ...: 
   1
   CPU times: user 18.1 ms, sys: 0 ns, total: 18.1 ms
   Wall time: 18.8 ms
   
   In [24]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(10_000):
       ...:         query = 
query.filter(~(errors.ImportError.filename.startswith(str(i))))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 1.33 s, sys: 12 ms, total: 1.34 s
   Wall time: 1.48 s
   ```
   
   After index to help text search `CREATE UNIQUE INDEX filename_idx ON 
import_error (filename text_pattern_ops);` 
https://stackoverflow.com/questions/31391101/postgresql-index-for-like-abc-searching
   
   ```python
   In [26]: %%time
       ...: with create_session() as session:
       ...:     query = session.query(errors.ImportError)
       ...:     for i in range(10_000):
       ...:         query = 
query.filter(~(errors.ImportError.filename.startswith(str(i))))
       ...:     print(query.count())
       ...: 
       ...: 
   1
   CPU times: user 835 ms, sys: 76 µs, total: 835 ms
   Wall time: 996 ms
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to