Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-rq for openSUSE:Factory 
checked in at 2023-12-07 19:10:41
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-rq (Old)
 and      /work/SRC/openSUSE:Factory/.python-rq.new.25432 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-rq"

Thu Dec  7 19:10:41 2023 rev:14 rq:1131504 version:1.15.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-rq/python-rq.changes      2023-06-12 
15:27:28.799417104 +0200
+++ /work/SRC/openSUSE:Factory/.python-rq.new.25432/python-rq.changes   
2023-12-07 19:12:24.471691005 +0100
@@ -1,0 +2,9 @@
+Wed Dec  6 22:25:44 UTC 2023 - Dirk Müller <dmuel...@suse.com>
+
+- update to 1.15.1:
+  * Fixed a bug that may cause a crash when cleaning intermediate
+    queue.
+  * Fixed a bug that may cause canceled jobs to still run
+    dependent jobs.
+
+-------------------------------------------------------------------

Old:
----
  rq-1.15.tar.gz

New:
----
  rq-1.15.1.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-rq.spec ++++++
--- /var/tmp/diff_new_pack.tJ8APS/_old  2023-12-07 19:12:26.631770692 +0100
+++ /var/tmp/diff_new_pack.tJ8APS/_new  2023-12-07 19:12:26.631770692 +0100
@@ -29,7 +29,7 @@
 
 %{?sle15_python_module_pythons}
 Name:           python-rq%{psuffix}
-Version:        1.15
+Version:        1.15.1
 Release:        0
 Summary:        Easy Job Queues for Python
 License:        Apache-2.0

++++++ rq-1.15.tar.gz -> rq-1.15.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/CHANGES.md new/rq-1.15.1/CHANGES.md
--- old/rq-1.15/CHANGES.md      2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/CHANGES.md    2023-06-20 01:28:45.000000000 +0200
@@ -1,3 +1,7 @@
+### RQ 1.15.1 (2023-06-20)
+* Fixed a bug that may cause a crash when cleaning intermediate queue. Thanks 
@selwin!
+* Fixed a bug that may cause canceled jobs to still run dependent jobs. Thanks 
@fredsod!
+
 ### RQ 1.15 (2023-05-24)
 * Added `Callback(on_stopped='my_callback)`. Thanks @eswolinsky3241!
 * `Callback` now accepts dotted path to function as input. Thanks 
@rishabh-ranjan!
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/Makefile new/rq-1.15.1/Makefile
--- old/rq-1.15/Makefile        2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/Makefile      2023-06-20 01:28:45.000000000 +0200
@@ -19,3 +19,7 @@
        git push --tags
        python setup.py sdist bdist_wheel
        twine upload dist/*
+
+lint:
+       @ black --check --skip-string-normalization --line-length 120 rq tests
+       @ ruff check --show-source rq tests
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/dev-requirements-36.txt 
new/rq-1.15.1/dev-requirements-36.txt
--- old/rq-1.15/dev-requirements-36.txt 2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/dev-requirements-36.txt       2023-06-20 01:28:45.000000000 
+0200
@@ -3,4 +3,4 @@
 psutil
 pytest
 pytest-cov
-sentry-sdk
\ No newline at end of file
+sentry-sdk
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/__init__.py new/rq-1.15.1/rq/__init__.py
--- old/rq-1.15/rq/__init__.py  2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/__init__.py        2023-06-20 01:28:45.000000000 +0200
@@ -5,4 +5,19 @@
 from .version import VERSION
 from .worker import SimpleWorker, Worker
 
+__all__ = [
+    "Connection",
+    "get_current_connection",
+    "pop_connection",
+    "push_connection",
+    "Callback",
+    "Retry",
+    "cancel_job",
+    "get_current_job",
+    "requeue_job",
+    "Queue",
+    "SimpleWorker",
+    "Worker",
+]
+
 __version__ = VERSION
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/job.py new/rq-1.15.1/rq/job.py
--- old/rq-1.15/rq/job.py       2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/job.py     2023-06-20 01:28:45.000000000 +0200
@@ -1586,7 +1586,7 @@
             # If parent job is not finished, we should only continue
             # if this job allows parent job to fail
             dependencies_ids.discard(parent_job.id)
-            if parent_job._status == JobStatus.CANCELED:
+            if parent_job.get_status() == JobStatus.CANCELED:
                 return False
             elif parent_job._status == JobStatus.FAILED and not 
self.allow_dependency_failures:
                 return False
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/maintenance.py 
new/rq-1.15.1/rq/maintenance.py
--- old/rq-1.15/rq/maintenance.py       2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/maintenance.py     2023-06-20 01:28:45.000000000 +0200
@@ -21,5 +21,6 @@
     for job_id in job_ids:
         if job_id not in queue.started_job_registry:
             job = queue.fetch_job(job_id)
-            worker.handle_job_failure(job, queue, exc_string='Job was stuck in 
the intermediate queue.')
+            if job:
+                worker.handle_job_failure(job, queue, exc_string='Job was 
stuck in intermediate queue.')
             queue.connection.lrem(queue.intermediate_queue_key, 1, job_id)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/queue.py new/rq-1.15.1/rq/queue.py
--- old/rq-1.15/rq/queue.py     2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/queue.py   2023-06-20 01:28:45.000000000 +0200
@@ -893,7 +893,7 @@
             kwargs (*kwargs): function kargs
         """
         if not isinstance(f, str) and f.__module__ == '__main__':
-            raise ValueError('Functions from the __main__ module cannot be 
processed ' 'by workers')
+            raise ValueError('Functions from the __main__ module cannot be 
processed by workers')
 
         # Detect explicit invocations, i.e. of the form:
         #     q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30)
@@ -1206,6 +1206,7 @@
                         pipeline=pipe,
                         exclude_job_id=exclude_job_id,
                     )
+                    and dependent_job.get_status(refresh=False) != 
JobStatus.CANCELED
                 ]
 
                 pipe.multi()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/timeouts.py new/rq-1.15.1/rq/timeouts.py
--- old/rq-1.15/rq/timeouts.py  2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/timeouts.py        2023-06-20 01:28:45.000000000 +0200
@@ -60,7 +60,7 @@
 
 class UnixSignalDeathPenalty(BaseDeathPenalty):
     def handle_death_penalty(self, signum, frame):
-        raise self._exception('Task exceeded maximum timeout value ' '({0} 
seconds)'.format(self._timeout))
+        raise self._exception('Task exceeded maximum timeout value ({0} 
seconds)'.format(self._timeout))
 
     def setup_death_penalty(self):
         """Sets up an alarm signal and a signal handler that raises
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/version.py new/rq-1.15.1/rq/version.py
--- old/rq-1.15/rq/version.py   2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/version.py 2023-06-20 01:28:45.000000000 +0200
@@ -1 +1 @@
-VERSION = '1.15.0'
+VERSION = '1.15.1'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/rq/worker.py new/rq-1.15.1/rq/worker.py
--- old/rq-1.15/rq/worker.py    2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/rq/worker.py  2023-06-20 01:28:45.000000000 +0200
@@ -456,6 +456,62 @@
             self.teardown()
         return bool(completed_jobs)
 
+    def handle_job_failure(self, job: 'Job', queue: 'Queue', 
started_job_registry=None, exc_string=''):
+        """
+        Handles the failure or an executing job by:
+            1. Setting the job status to failed
+            2. Removing the job from StartedJobRegistry
+            3. Setting the workers current job to None
+            4. Add the job to FailedJobRegistry
+        `save_exc_to_job` should only be used for testing purposes
+        """
+        self.log.debug('Handling failed execution of job %s', job.id)
+        with self.connection.pipeline() as pipeline:
+            if started_job_registry is None:
+                started_job_registry = StartedJobRegistry(
+                    job.origin, self.connection, job_class=self.job_class, 
serializer=self.serializer
+                )
+
+            # check whether a job was stopped intentionally and set the job
+            # status appropriately if it was this job.
+            job_is_stopped = self._stopped_job_id == job.id
+            retry = job.retries_left and job.retries_left > 0 and not 
job_is_stopped
+
+            if job_is_stopped:
+                job.set_status(JobStatus.STOPPED, pipeline=pipeline)
+                self._stopped_job_id = None
+            else:
+                # Requeue/reschedule if retry is configured, otherwise
+                if not retry:
+                    job.set_status(JobStatus.FAILED, pipeline=pipeline)
+
+            started_job_registry.remove(job, pipeline=pipeline)
+
+            if not self.disable_default_exception_handler and not retry:
+                job._handle_failure(exc_string, pipeline=pipeline)
+                with suppress(redis.exceptions.ConnectionError):
+                    pipeline.execute()
+
+            self.set_current_job_id(None, pipeline=pipeline)
+            self.increment_failed_job_count(pipeline)
+            if job.started_at and job.ended_at:
+                self.increment_total_working_time(job.ended_at - 
job.started_at, pipeline)
+
+            if retry:
+                job.retry(queue, pipeline)
+                enqueue_dependents = False
+            else:
+                enqueue_dependents = True
+
+            try:
+                pipeline.execute()
+                if enqueue_dependents:
+                    queue.enqueue_dependents(job)
+            except Exception:
+                # Ensure that custom exception handlers are called
+                # even if Redis is down
+                pass
+
     def _start_scheduler(
         self,
         burst: bool = False,
@@ -653,7 +709,7 @@
         connection: Union[Redis, 'Pipeline'] = pipeline if pipeline is not 
None else self.connection
         connection.expire(self.key, timeout)
         connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
-        self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one 
should arrive in %s seconds.', timeout)
+        self.log.debug('Sent heartbeat to prevent worker timeout. Next one 
should arrive in %s seconds.', timeout)
 
 
 class Worker(BaseWorker):
@@ -947,7 +1003,7 @@
         if self.get_state() == WorkerStatus.BUSY:
             self._stop_requested = True
             self.set_shutdown_requested_date()
-            self.log.debug('Stopping after current horse is finished. ' 'Press 
Ctrl+C again for a cold shutdown.')
+            self.log.debug('Stopping after current horse is finished. Press 
Ctrl+C again for a cold shutdown.')
             if self.scheduler:
                 self.stop_scheduler()
         else:
@@ -1294,62 +1350,6 @@
         msg = 'Processing {0} from {1} since {2}'
         self.procline(msg.format(job.func_name, job.origin, time.time()))
 
-    def handle_job_failure(self, job: 'Job', queue: 'Queue', 
started_job_registry=None, exc_string=''):
-        """
-        Handles the failure or an executing job by:
-            1. Setting the job status to failed
-            2. Removing the job from StartedJobRegistry
-            3. Setting the workers current job to None
-            4. Add the job to FailedJobRegistry
-        `save_exc_to_job` should only be used for testing purposes
-        """
-        self.log.debug('Handling failed execution of job %s', job.id)
-        with self.connection.pipeline() as pipeline:
-            if started_job_registry is None:
-                started_job_registry = StartedJobRegistry(
-                    job.origin, self.connection, job_class=self.job_class, 
serializer=self.serializer
-                )
-
-            # check whether a job was stopped intentionally and set the job
-            # status appropriately if it was this job.
-            job_is_stopped = self._stopped_job_id == job.id
-            retry = job.retries_left and job.retries_left > 0 and not 
job_is_stopped
-
-            if job_is_stopped:
-                job.set_status(JobStatus.STOPPED, pipeline=pipeline)
-                self._stopped_job_id = None
-            else:
-                # Requeue/reschedule if retry is configured, otherwise
-                if not retry:
-                    job.set_status(JobStatus.FAILED, pipeline=pipeline)
-
-            started_job_registry.remove(job, pipeline=pipeline)
-
-            if not self.disable_default_exception_handler and not retry:
-                job._handle_failure(exc_string, pipeline=pipeline)
-                with suppress(redis.exceptions.ConnectionError):
-                    pipeline.execute()
-
-            self.set_current_job_id(None, pipeline=pipeline)
-            self.increment_failed_job_count(pipeline)
-            if job.started_at and job.ended_at:
-                self.increment_total_working_time(job.ended_at - 
job.started_at, pipeline)
-
-            if retry:
-                job.retry(queue, pipeline)
-                enqueue_dependents = False
-            else:
-                enqueue_dependents = True
-
-            try:
-                pipeline.execute()
-                if enqueue_dependents:
-                    queue.enqueue_dependents(job)
-            except Exception:
-                # Ensure that custom exception handlers are called
-                # even if Redis is down
-                pass
-
     def handle_job_success(self, job: 'Job', queue: 'Queue', 
started_job_registry: StartedJobRegistry):
         """Handles the successful execution of certain job.
         It will remove the job from the `StartedJobRegistry`, adding it to the 
`SuccessfulJobRegistry`,
@@ -1498,7 +1498,9 @@
         extra.update({'queue': job.origin, 'job_id': job.id})
 
         # func_name
-        self.log.error('[Job %s]: exception raised while executing (%s)\n' + 
exc_string, job.id, func_name, extra=extra)
+        self.log.error(
+            '[Job %s]: exception raised while executing (%s)\n%s', job.id, 
func_name, exc_string, extra=extra
+        )
 
         for handler in self._exc_handlers:
             self.log.debug('Invoking exception handler %s', handler)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/rq-1.15/tests/test_worker.py 
new/rq-1.15.1/tests/test_worker.py
--- old/rq-1.15/tests/test_worker.py    2023-05-27 04:26:09.000000000 +0200
+++ new/rq-1.15.1/tests/test_worker.py  2023-06-20 01:28:45.000000000 +0200
@@ -4,6 +4,7 @@
 import signal
 import subprocess
 import sys
+import threading
 import time
 import zlib
 from datetime import datetime, timedelta
@@ -268,7 +269,7 @@
         w.perform_job(job, queue)
 
         # An exception should be logged here at ERROR level
-        self.assertIn("Traceback", mock_logger_error.call_args[0][0])
+        self.assertIn("Traceback", mock_logger_error.call_args[0][3])
 
     def test_heartbeat(self):
         """Heartbeat saves last_heartbeat"""
@@ -594,6 +595,91 @@
         # Should not have created evidence of execution
         self.assertEqual(os.path.exists(SENTINEL_FILE), False)
 
+    def test_cancel_running_parent_job(self):
+        """Cancel a running parent job and verify that
+        dependent jobs are not started."""
+
+        def cancel_parent_job(job):
+            while job.is_queued:
+                time.sleep(1)
+
+            job.cancel()
+            return
+
+        q = Queue(
+            "low",
+        )
+        parent_job = q.enqueue(long_running_job, 5)
+
+        job = q.enqueue(say_hello, depends_on=parent_job)
+        job2 = q.enqueue(say_hello, depends_on=job)
+        status_thread = threading.Thread(target=cancel_parent_job, 
args=(parent_job,))
+        status_thread.start()
+
+        w = Worker([q])
+        w.work(
+            burst=True,
+        )
+        status_thread.join()
+
+        self.assertNotEqual(parent_job.result, None)
+        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+        self.assertEqual(job.result, None)
+        self.assertEqual(job2.get_status(), JobStatus.DEFERRED)
+        self.assertEqual(job2.result, None)
+        self.assertEqual(q.count, 0)
+
+    def test_cancel_dependent_job(self):
+        """Cancel job and verify that when the parent job is finished,
+        the dependent job is not started."""
+
+        q = Queue(
+            "low",
+        )
+        parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
+        job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
+        job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
+        job.cancel()
+
+        w = Worker([q])
+        w.work(
+            burst=True,
+        )
+        self.assertTrue(job.is_canceled)
+        self.assertNotEqual(parent_job.result, None)
+        self.assertEqual(job.get_status(), JobStatus.CANCELED)
+        self.assertEqual(job.result, None)
+        self.assertEqual(job2.result, None)
+        self.assertEqual(job2.get_status(), JobStatus.DEFERRED)
+        self.assertEqual(q.count, 0)
+
+    def test_cancel_job_enqueue_dependent(self):
+        """Cancel a job in a chain and enqueue the dependent jobs."""
+
+        q = Queue(
+            "low",
+        )
+        parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
+        job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
+        job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
+        job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3")
+
+        job.cancel(enqueue_dependents=True)
+
+        w = Worker([q])
+        w.work(
+            burst=True,
+        )
+        self.assertTrue(job.is_canceled)
+        self.assertNotEqual(parent_job.result, None)
+        self.assertEqual(job.get_status(), JobStatus.CANCELED)
+        self.assertEqual(job.result, None)
+        self.assertNotEqual(job2.result, None)
+        self.assertEqual(job2.get_status(), JobStatus.FINISHED)
+        self.assertEqual(job3.get_status(), JobStatus.FINISHED)
+
+        self.assertEqual(q.count, 0)
+
     @slow
     def test_max_idle_time(self):
         q = Queue()

Reply via email to