ephraimbuddy opened a new pull request #19553:
URL: https://github.com/apache/airflow/pull/19553
The DagFileProcessor.manage_slas does not consider if an SlaMiss already
exists in
DB while inserting slas.
If an SLA for a task is missed and recorded, on checking SLA again, this task
comes up again if there's no recent run of the task and we try to insert
the record into the SlaMiss table again, this results in Integrity error.
This PR fixes that by avoiding insert if the record already exists
```
[2021-11-12 11:56:11,159] {processor.py:567} ERROR - Error executing
SlaCallbackRequest callback for file: /files/dags/example_sla_dag.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1257, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
line 912, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique
constraint "sla_miss_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag,
2021-11-12 11:56:00+00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow/dag_processing/processor.py", line 560, in
execute_callbacks
self.manage_slas(dagbag.dags.get(request.dag_id))
File "/opt/airflow/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/dag_processing/processor.py", line 434, in
manage_slas
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2682, in _flush
transaction.rollback(_capture_exception=True)
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line
70, in __exit__
with_traceback=exc_tb,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 2642, in _flush
flush_context.execute()
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line
422, in execute
rec.execute(self)
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line
589, in execute
uow,
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line
245, in save_obj
insert,
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line
1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py",
line 1257, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
line 912, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate
key value violates unique constraint "sla_miss_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag,
2021-11-12 11:56:00+00) already exists.
```
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]