Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-rq for openSUSE:Factory checked in at 2023-12-07 19:10:41 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-rq (Old) and /work/SRC/openSUSE:Factory/.python-rq.new.25432 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-rq" Thu Dec 7 19:10:41 2023 rev:14 rq:1131504 version:1.15.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-rq/python-rq.changes 2023-06-12 15:27:28.799417104 +0200 +++ /work/SRC/openSUSE:Factory/.python-rq.new.25432/python-rq.changes 2023-12-07 19:12:24.471691005 +0100 @@ -1,0 +2,9 @@ +Wed Dec 6 22:25:44 UTC 2023 - Dirk Müller <dmuel...@suse.com> + +- update to 1.15.1: + * Fixed a bug that may cause a crash when cleaning intermediate + queue. + * Fixed a bug that may cause canceled jobs to still run + dependent jobs. + +------------------------------------------------------------------- Old: ---- rq-1.15.tar.gz New: ---- rq-1.15.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-rq.spec ++++++ --- /var/tmp/diff_new_pack.tJ8APS/_old 2023-12-07 19:12:26.631770692 +0100 +++ /var/tmp/diff_new_pack.tJ8APS/_new 2023-12-07 19:12:26.631770692 +0100 @@ -29,7 +29,7 @@ %{?sle15_python_module_pythons} Name: python-rq%{psuffix} -Version: 1.15 +Version: 1.15.1 Release: 0 Summary: Easy Job Queues for Python License: Apache-2.0 ++++++ rq-1.15.tar.gz -> rq-1.15.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/CHANGES.md new/rq-1.15.1/CHANGES.md --- old/rq-1.15/CHANGES.md 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/CHANGES.md 2023-06-20 01:28:45.000000000 +0200 @@ -1,3 +1,7 @@ +### RQ 1.15.1 (2023-06-20) +* Fixed a bug that may cause a crash when cleaning intermediate queue. Thanks @selwin! +* Fixed a bug that may cause canceled jobs to still run dependent jobs. Thanks @fredsod! + ### RQ 1.15 (2023-05-24) * Added `Callback(on_stopped='my_callback)`. Thanks @eswolinsky3241! * `Callback` now accepts dotted path to function as input. Thanks @rishabh-ranjan! diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/Makefile new/rq-1.15.1/Makefile --- old/rq-1.15/Makefile 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/Makefile 2023-06-20 01:28:45.000000000 +0200 @@ -19,3 +19,7 @@ git push --tags python setup.py sdist bdist_wheel twine upload dist/* + +lint: + @ black --check --skip-string-normalization --line-length 120 rq tests + @ ruff check --show-source rq tests diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/dev-requirements-36.txt new/rq-1.15.1/dev-requirements-36.txt --- old/rq-1.15/dev-requirements-36.txt 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/dev-requirements-36.txt 2023-06-20 01:28:45.000000000 +0200 @@ -3,4 +3,4 @@ psutil pytest pytest-cov -sentry-sdk \ No newline at end of file +sentry-sdk diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/__init__.py new/rq-1.15.1/rq/__init__.py --- old/rq-1.15/rq/__init__.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/__init__.py 2023-06-20 01:28:45.000000000 +0200 @@ -5,4 +5,19 @@ from .version import VERSION from .worker import SimpleWorker, Worker +__all__ = [ + "Connection", + "get_current_connection", + "pop_connection", + "push_connection", + "Callback", + "Retry", + "cancel_job", + "get_current_job", + "requeue_job", + "Queue", + "SimpleWorker", + "Worker", +] + __version__ = VERSION diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/job.py new/rq-1.15.1/rq/job.py --- old/rq-1.15/rq/job.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/job.py 2023-06-20 01:28:45.000000000 +0200 @@ -1586,7 +1586,7 @@ # If parent job is not finished, we should only continue # if this job allows parent job to fail dependencies_ids.discard(parent_job.id) - if parent_job._status == JobStatus.CANCELED: + if parent_job.get_status() == JobStatus.CANCELED: return False elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures: return False diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/maintenance.py new/rq-1.15.1/rq/maintenance.py --- old/rq-1.15/rq/maintenance.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/maintenance.py 2023-06-20 01:28:45.000000000 +0200 @@ -21,5 +21,6 @@ for job_id in job_ids: if job_id not in queue.started_job_registry: job = queue.fetch_job(job_id) - worker.handle_job_failure(job, queue, exc_string='Job was stuck in the intermediate queue.') + if job: + worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/queue.py new/rq-1.15.1/rq/queue.py --- old/rq-1.15/rq/queue.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/queue.py 2023-06-20 01:28:45.000000000 +0200 @@ -893,7 +893,7 @@ kwargs (*kwargs): function kargs """ if not isinstance(f, str) and f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers') + raise ValueError('Functions from the __main__ module cannot be processed by workers') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) @@ -1206,6 +1206,7 @@ pipeline=pipe, exclude_job_id=exclude_job_id, ) + and dependent_job.get_status(refresh=False) != JobStatus.CANCELED ] pipe.multi() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/timeouts.py new/rq-1.15.1/rq/timeouts.py --- old/rq-1.15/rq/timeouts.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/timeouts.py 2023-06-20 01:28:45.000000000 +0200 @@ -60,7 +60,7 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): def handle_death_penalty(self, signum, frame): - raise self._exception('Task exceeded maximum timeout value ' '({0} seconds)'.format(self._timeout)) + raise self._exception('Task exceeded maximum timeout value ({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/version.py new/rq-1.15.1/rq/version.py --- old/rq-1.15/rq/version.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/version.py 2023-06-20 01:28:45.000000000 +0200 @@ -1 +1 @@ -VERSION = '1.15.0' +VERSION = '1.15.1' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/rq/worker.py new/rq-1.15.1/rq/worker.py --- old/rq-1.15/rq/worker.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/rq/worker.py 2023-06-20 01:28:45.000000000 +0200 @@ -456,6 +456,62 @@ self.teardown() return bool(completed_jobs) + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): + """ + Handles the failure or an executing job by: + 1. Setting the job status to failed + 2. Removing the job from StartedJobRegistry + 3. Setting the workers current job to None + 4. Add the job to FailedJobRegistry + `save_exc_to_job` should only be used for testing purposes + """ + self.log.debug('Handling failed execution of job %s', job.id) + with self.connection.pipeline() as pipeline: + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, self.connection, job_class=self.job_class, serializer=self.serializer + ) + + # check whether a job was stopped intentionally and set the job + # status appropriately if it was this job. + job_is_stopped = self._stopped_job_id == job.id + retry = job.retries_left and job.retries_left > 0 and not job_is_stopped + + if job_is_stopped: + job.set_status(JobStatus.STOPPED, pipeline=pipeline) + self._stopped_job_id = None + else: + # Requeue/reschedule if retry is configured, otherwise + if not retry: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + + started_job_registry.remove(job, pipeline=pipeline) + + if not self.disable_default_exception_handler and not retry: + job._handle_failure(exc_string, pipeline=pipeline) + with suppress(redis.exceptions.ConnectionError): + pipeline.execute() + + self.set_current_job_id(None, pipeline=pipeline) + self.increment_failed_job_count(pipeline) + if job.started_at and job.ended_at: + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) + + if retry: + job.retry(queue, pipeline) + enqueue_dependents = False + else: + enqueue_dependents = True + + try: + pipeline.execute() + if enqueue_dependents: + queue.enqueue_dependents(job) + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass + def _start_scheduler( self, burst: bool = False, @@ -653,7 +709,7 @@ connection: Union[Redis, 'Pipeline'] = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) - self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive in %s seconds.', timeout) + self.log.debug('Sent heartbeat to prevent worker timeout. Next one should arrive in %s seconds.', timeout) class Worker(BaseWorker): @@ -947,7 +1003,7 @@ if self.get_state() == WorkerStatus.BUSY: self._stop_requested = True self.set_shutdown_requested_date() - self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') + self.log.debug('Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.') if self.scheduler: self.stop_scheduler() else: @@ -1294,62 +1350,6 @@ msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): - """ - Handles the failure or an executing job by: - 1. Setting the job status to failed - 2. Removing the job from StartedJobRegistry - 3. Setting the workers current job to None - 4. Add the job to FailedJobRegistry - `save_exc_to_job` should only be used for testing purposes - """ - self.log.debug('Handling failed execution of job %s', job.id) - with self.connection.pipeline() as pipeline: - if started_job_registry is None: - started_job_registry = StartedJobRegistry( - job.origin, self.connection, job_class=self.job_class, serializer=self.serializer - ) - - # check whether a job was stopped intentionally and set the job - # status appropriately if it was this job. - job_is_stopped = self._stopped_job_id == job.id - retry = job.retries_left and job.retries_left > 0 and not job_is_stopped - - if job_is_stopped: - job.set_status(JobStatus.STOPPED, pipeline=pipeline) - self._stopped_job_id = None - else: - # Requeue/reschedule if retry is configured, otherwise - if not retry: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - - started_job_registry.remove(job, pipeline=pipeline) - - if not self.disable_default_exception_handler and not retry: - job._handle_failure(exc_string, pipeline=pipeline) - with suppress(redis.exceptions.ConnectionError): - pipeline.execute() - - self.set_current_job_id(None, pipeline=pipeline) - self.increment_failed_job_count(pipeline) - if job.started_at and job.ended_at: - self.increment_total_working_time(job.ended_at - job.started_at, pipeline) - - if retry: - job.retry(queue, pipeline) - enqueue_dependents = False - else: - enqueue_dependents = True - - try: - pipeline.execute() - if enqueue_dependents: - queue.enqueue_dependents(job) - except Exception: - # Ensure that custom exception handlers are called - # even if Redis is down - pass - def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): """Handles the successful execution of certain job. It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`, @@ -1498,7 +1498,9 @@ extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra) + self.log.error( + '[Job %s]: exception raised while executing (%s)\n%s', job.id, func_name, exc_string, extra=extra + ) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-1.15/tests/test_worker.py new/rq-1.15.1/tests/test_worker.py --- old/rq-1.15/tests/test_worker.py 2023-05-27 04:26:09.000000000 +0200 +++ new/rq-1.15.1/tests/test_worker.py 2023-06-20 01:28:45.000000000 +0200 @@ -4,6 +4,7 @@ import signal import subprocess import sys +import threading import time import zlib from datetime import datetime, timedelta @@ -268,7 +269,7 @@ w.perform_job(job, queue) # An exception should be logged here at ERROR level - self.assertIn("Traceback", mock_logger_error.call_args[0][0]) + self.assertIn("Traceback", mock_logger_error.call_args[0][3]) def test_heartbeat(self): """Heartbeat saves last_heartbeat""" @@ -594,6 +595,91 @@ # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + def test_cancel_running_parent_job(self): + """Cancel a running parent job and verify that + dependent jobs are not started.""" + + def cancel_parent_job(job): + while job.is_queued: + time.sleep(1) + + job.cancel() + return + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5) + + job = q.enqueue(say_hello, depends_on=parent_job) + job2 = q.enqueue(say_hello, depends_on=job) + status_thread = threading.Thread(target=cancel_parent_job, args=(parent_job,)) + status_thread.start() + + w = Worker([q]) + w.work( + burst=True, + ) + status_thread.join() + + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertEqual(job.result, None) + self.assertEqual(job2.get_status(), JobStatus.DEFERRED) + self.assertEqual(job2.result, None) + self.assertEqual(q.count, 0) + + def test_cancel_dependent_job(self): + """Cancel job and verify that when the parent job is finished, + the dependent job is not started.""" + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") + job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") + job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") + job.cancel() + + w = Worker([q]) + w.work( + burst=True, + ) + self.assertTrue(job.is_canceled) + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + self.assertEqual(job.result, None) + self.assertEqual(job2.result, None) + self.assertEqual(job2.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.count, 0) + + def test_cancel_job_enqueue_dependent(self): + """Cancel a job in a chain and enqueue the dependent jobs.""" + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") + job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") + job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") + job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3") + + job.cancel(enqueue_dependents=True) + + w = Worker([q]) + w.work( + burst=True, + ) + self.assertTrue(job.is_canceled) + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + self.assertEqual(job.result, None) + self.assertNotEqual(job2.result, None) + self.assertEqual(job2.get_status(), JobStatus.FINISHED) + self.assertEqual(job3.get_status(), JobStatus.FINISHED) + + self.assertEqual(q.count, 0) + @slow def test_max_idle_time(self): q = Queue()