ephraimbuddy opened a new pull request #19553:
URL: https://github.com/apache/airflow/pull/19553


   The DagFileProcessor.manage_slas does not consider if an SlaMiss already 
exists in
   DB while inserting slas.
   
   If an SLA for a task is missed and recorded, on checking SLA again, this task
   comes up again if there's no recent run of the task and we try to insert
   the record into the SlaMiss table again, this results in Integrity error.
   
   This PR fixes that by avoiding insert if the record already exists
   
   ```
   [2021-11-12 11:56:11,159] {processor.py:567} ERROR - Error executing 
SlaCallbackRequest callback for file: /files/dags/example_sla_dag.py
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1257, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 912, in do_executemany
       cursor.executemany(statement, parameters)
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "sla_miss_pkey"
   DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 
2021-11-12 11:56:00+00) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/dag_processing/processor.py", line 560, in 
execute_callbacks
       self.manage_slas(dagbag.dags.get(request.dag_id))
     File "/opt/airflow/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/dag_processing/processor.py", line 434, in 
manage_slas
       session.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1046, in commit
       self.transaction.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 504, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 483, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 2540, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 2682, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 2642, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 
422, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 
589, in execute
       uow,
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 
245, in save_obj
       insert,
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 
1083, in _emit_insert_statements
       c = cached_connections[connection].execute(statement, multiparams)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
       distilled_params,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1257, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 912, in do_executemany
       cursor.executemany(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "sla_miss_pkey"
   DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 
2021-11-12 11:56:00+00) already exists.
   ```
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to