shivanshs9 opened a new pull request #14774: URL: https://github.com/apache/airflow/pull/14774
<!-- Thank you for contributing! Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping committers for the review! In case of existing issue, reference it using one of the following: closes: #ISSUE related: #ISSUE How to write a good git commit message: http://chris.beams.io/posts/git-commit/ --> Closes #14770 --- Fixes various runtime errors in smart sensor: 1. UnboundLocalError ``` UnboundLocalError: local variable 'query_result' referenced before assignment File "airflow/sensors/smart_sensor.py", line 639, in _execute_sensor_work self._mark_multi_state( File "airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "airflow/sensors/smart_sensor.py", line 470, in _mark_multi_state self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state) ``` 2. RuntimeError ```RuntimeError: dictionary changed size during iteration File "airflow/sentry.py", line 159, in wrapper return func(task_instance, *args, session=session, **kwargs) File "airflow/models/taskinstance.py", line 1112, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "airflow/models/taskinstance.py", line 1315, in _execute_task result = task_copy.execute(context=context) File "airflow/sensors/smart_sensor.py", line 736, in execute self.flush_cached_sensor_poke_results() File "airflow/sensors/smart_sensor.py", line 681, in flush_cached_sensor_poke_results for ti_key, sensor_exception in self.cached_sensor_exceptions.items(): ``` 3. InvalidRequestError (should be related to 1) ``` InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Timeout, PID: 24375 (Background on this error at: http://sqlalche.me/e/13/7s2a) File "airflow/sensors/smart_sensor.py", line 639, in _execute_sensor_work self._mark_multi_state( File "airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "contextlib.py", line 120, in __exit__ next(self.gen) File "airflow/utils/session.py", line 32, in create_session session.commit() File "sqlalchemy/orm/session.py", line 1046, in commit self.transaction.commit() File "sqlalchemy/orm/session.py", line 502, in commit self._assert_active(prepared_ok=True) File "sqlalchemy/orm/session.py", line 289, in _assert_active raise sa_exc.InvalidRequestError( ``` Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
