1fanwang commented on code in PR #66788:
URL: https://github.com/apache/airflow/pull/66788#discussion_r3232439118
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors(
return []
except OperationalError:
raise
+ except IntegrityError as exc:
+ # Multiple Dag processors writing the same brand-new Dag can race on
the INSERT.
+ # The loser's transaction is already invalid, so we must roll the
session back to
+ # avoid PendingRollbackError on subsequent per-Dag work in this
parsing cycle.
+ # The winning peer already produced the correct row, so this is not an
import error
+ # and we don't retry. Non-unique IntegrityErrors (e.g. NOT-NULL
violations from a
+ # genuinely malformed Dag) fall through to the generic Exception arm.
Review Comment:
The trace that lands in `import_errors` on each dialect, captured by
reverting the new `except IntegrityError` arm and rerunning the regression test
against `main`:
```
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint
failed: serialized_dag.dag_id
sqlalchemy.exc.IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate
entry 'my_dag' for key 'serialized_dag.PRIMARY'")
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate
key value violates unique constraint "serialized_dag_pkey"
```
The IntegrityError raised by the losing processor's
`SerializedDagModel.write_dag` is caught by the existing generic `except
Exception` arm in `_serialize_dag_capturing_errors`, fed through
`traceback.format_exc(...)`, and recorded as the import-error value for the
parsing cycle. The loser's now-invalid transaction also causes
`PendingRollbackError` on the next per-Dag write in the same
`update_dag_parsing_results_in_db` call. Added a before/after pytest snippet to
the PR body that surfaces each dialect's exact message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]