shivanshs9 opened a new pull request #14774:
URL: https://github.com/apache/airflow/pull/14774


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   Closes #14770
   
   ---
   
   Fixes various runtime errors in smart sensor:
   1. UnboundLocalError
   ```
   UnboundLocalError: local variable 'query_result' referenced before assignment
     File "airflow/sensors/smart_sensor.py", line 639, in _execute_sensor_work
       self._mark_multi_state(
     File "airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "airflow/sensors/smart_sensor.py", line 470, in _mark_multi_state
       self.log.info("Marked %s tasks out of %s to state %s", count_marked, 
len(query_result), state)
   ```
   2. RuntimeError
   ```RuntimeError: dictionary changed size during iteration
     File "airflow/sentry.py", line 159, in wrapper
       return func(task_instance, *args, session=session, **kwargs)
     File "airflow/models/taskinstance.py", line 1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File "airflow/models/taskinstance.py", line 1285, in 
_prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File "airflow/models/taskinstance.py", line 1315, in _execute_task
       result = task_copy.execute(context=context)
     File "airflow/sensors/smart_sensor.py", line 736, in execute
       self.flush_cached_sensor_poke_results()
     File "airflow/sensors/smart_sensor.py", line 681, in 
flush_cached_sensor_poke_results
       for ti_key, sensor_exception in self.cached_sensor_exceptions.items():
   ```
   3. InvalidRequestError (should be related to 1)
   ```
   InvalidRequestError: This Session's transaction has been rolled back due to 
a previous exception during flush. To begin a new transaction with this 
Session, first issue Session.rollback(). Original exception was: Timeout, PID: 
24375 (Background on this error at: http://sqlalche.me/e/13/7s2a)
     File "airflow/sensors/smart_sensor.py", line 639, in _execute_sensor_work
       self._mark_multi_state(
     File "airflow/utils/session.py", line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File "contextlib.py", line 120, in __exit__
       next(self.gen)
     File "airflow/utils/session.py", line 32, in create_session
       session.commit()
     File "sqlalchemy/orm/session.py", line 1046, in commit
       self.transaction.commit()
     File "sqlalchemy/orm/session.py", line 502, in commit
       self._assert_active(prepared_ok=True)
     File "sqlalchemy/orm/session.py", line 289, in _assert_active
       raise sa_exc.InvalidRequestError(
   ```
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to