Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-rq for openSUSE:Factory checked in at 2026-06-15 19:44:03 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-rq (Old) and /work/SRC/openSUSE:Factory/.python-rq.new.1981 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-rq" Mon Jun 15 19:44:03 2026 rev:20 rq:1359299 version:2.9.1 Changes: -------- --- /work/SRC/openSUSE:Factory/python-rq/python-rq.changes 2026-05-07 15:45:59.477617486 +0200 +++ /work/SRC/openSUSE:Factory/.python-rq.new.1981/python-rq.changes 2026-06-15 19:47:19.573175947 +0200 @@ -1,0 +2,22 @@ +Sun Jun 14 19:23:26 UTC 2026 - Dirk Müller <[email protected]> + +- update to 2.9.1: + * `Job.create()` now supports `retry` argument. Thanks + @sethuvishal! + * Add compatibility with `redis-py` >= 8. Thanks @selwin! + * Added `json` and `pickle` shorthand aliases for serializers. + These can now be used when creating queues/workers and with + the `--serializer` CLI option. Thanks @selwin! + * Fixed a bug where `SpawnWorker` does not use user supplied + serializer. Thanks @selwin! + * `Queue.parse_args()` now returns a `EnqueueArgs` named tuple. + Thanks @libmilos-so! + * Fixed a race condition that could cause worker keys in Redis + to get out of sync when Redis is under load. Thanks + @terencehonles! + * Enqueueing deferred jobs now removes them from + `DeferredJobRegistry`. Thanks @selwin! + * `SpawnWorker` now uses `repr()` when reconstructing worker, + job and queue identifiers in child processes. Thanks @selwin! + +------------------------------------------------------------------- Old: ---- rq-2.8.tar.gz New: ---- rq-2.9.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-rq.spec ++++++ --- /var/tmp/diff_new_pack.PgTYHK/_old 2026-06-15 19:47:20.541216625 +0200 +++ /var/tmp/diff_new_pack.PgTYHK/_new 2026-06-15 19:47:20.545216793 +0200 @@ -29,7 +29,7 @@ %{?sle15_python_module_pythons} Name: python-rq%{psuffix} -Version: 2.8 +Version: 2.9.1 Release: 0 Summary: Easy Job Queues for Python License: Apache-2.0 ++++++ rq-2.8.tar.gz -> rq-2.9.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/.github/workflows/dependencies.yml new/rq-2.9.1/.github/workflows/dependencies.yml --- old/rq-2.8/.github/workflows/dependencies.yml 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/.github/workflows/dependencies.yml 2026-06-06 04:47:31.000000000 +0200 @@ -13,8 +13,8 @@ runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] - redis-version: [3, 4, 5, 6, 7] + python-version: ["3.10", "3.11", "3.12", "3.13"] + redis-version: [5, 6, 7, 8] redis-py-version: [3.5.0] steps: @@ -48,8 +48,8 @@ strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] - redis-version: [3, 4, 5, 6, 7] + python-version: ["3.10", "3.11", "3.12", "3.13"] + redis-version: [5, 6, 7, 8] steps: - uses: actions/checkout@v6 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/.github/workflows/valkey.yml new/rq-2.9.1/.github/workflows/valkey.yml --- old/rq-2.8/.github/workflows/valkey.yml 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/.github/workflows/valkey.yml 2026-06-06 04:47:31.000000000 +0200 @@ -39,7 +39,7 @@ matrix: python-version: ["3.12", "3.13", "3.14"] valkey-version: ["7.2", "8.0", "9.0"] - redis-py-version: ["5", "6", "7"] + redis-py-version: ["5", "6", "7", "8"] steps: - name: Checkout code @@ -54,8 +54,8 @@ run: | python -m pip install --upgrade pip pip install hatch - # Install the specific redis-py version for this matrix combination - pip install redis==${{ matrix.redis-py-version }} + hatch run test:python -m ensurepip --upgrade + hatch run test:python -m pip install redis==${{ matrix.redis-py-version }} - name: Run RQ tests with pytest run: | @@ -66,7 +66,7 @@ VALKEY_PORT: 6379 - name: Upload coverage to Codecov - uses: codecov/[email protected] + uses: codecov/[email protected] with: files: ./coverage.xml fail_ci_if_error: false diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/.github/workflows/workflow.yml new/rq-2.9.1/.github/workflows/workflow.yml --- old/rq-2.8/.github/workflows/workflow.yml 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/.github/workflows/workflow.yml 2026-06-06 04:47:31.000000000 +0200 @@ -37,9 +37,14 @@ timeout-minutes: 10 strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] - redis-version: [5, 6, 7] - redis-py-version: [5, 6] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + redis-version: [5, 6, 7, 8] + redis-py-version: [5, 6, 8] + exclude: + # redis-py 8 defaults to RESP3, which negotiates via the HELLO command. + # HELLO was added in Redis 6.0, so redis-py 8 cannot connect to Redis 5. + - redis-version: 5 + redis-py-version: 8 steps: - uses: actions/checkout@v6 @@ -58,13 +63,14 @@ run: | python -m pip install --upgrade pip pip install hatch "virtualenv<20.27" - pip install redis==${{ matrix.redis-py-version }} + hatch run test:python -m ensurepip --upgrade + hatch run test:python -m pip install redis==${{ matrix.redis-py-version }} - name: Test with pytest run: | RUN_SLOW_TESTS_TOO=1 hatch run test:cov --durations=5 - name: Upload coverage to Codecov - uses: codecov/[email protected] + uses: codecov/[email protected] with: files: ./coverage.xml fail_ci_if_error: false diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/CHANGES.md new/rq-2.9.1/CHANGES.md --- old/rq-2.8/CHANGES.md 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/CHANGES.md 2026-06-06 04:47:31.000000000 +0200 @@ -1,3 +1,16 @@ +### RQ 2.9.1 (2026-06-06) +* `Job.create()` now supports `retry` argument. Thanks @sethuvishal! +* Add compatibility with `redis-py` >= 8. Thanks @selwin! + +### RQ 2.9.0 (2026-05-19) +* Added `json` and `pickle` shorthand aliases for serializers. These can now be used when creating queues/workers and with the `--serializer` CLI option. Thanks @selwin! +* Fixed a bug where `SpawnWorker` does not use user supplied serializer. Thanks @selwin! +* `Queue.parse_args()` now returns a `EnqueueArgs` named tuple. Thanks @libmilos-so! +* Fixed a race condition that could cause worker keys in Redis to get out of sync when Redis is under load. Thanks @terencehonles! +* Enqueueing deferred jobs now removes them from `DeferredJobRegistry`. Thanks @selwin! +* `SpawnWorker` now uses `repr()` when reconstructing worker, job and queue identifiers in child processes. Thanks @selwin! +* Minor typing and cleanup improvements. Thanks @selwin and @rextea! + ### RQ 2.8.0 (2026-04-16) * Added support for unique jobs. Passing `unique=True` with `job_id` prevents duplicate jobs from being enqueued or scheduled. Thanks @selwin! * `Result` now stores execution metadata (`execution_id`, `execution_started_at` and `execution_ended_at`). Thanks @selwin! diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/README.md new/rq-2.9.1/README.md --- old/rq-2.8/README.md 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/README.md 2026-06-06 04:47:31.000000000 +0200 @@ -189,14 +189,14 @@ cron.register( backup_database, queue_name='maintenance', - cron_string='0 3 * * *' + cron='0 3 * * *' ) # Monthly report on the first day of each month at 8:00 AM cron.register( generate_monthly_report, queue_name='reports', - cron_string='0 8 1 * *' + cron='0 8 1 * *' ) ```python @@ -224,6 +224,12 @@ More options are documented on [python-rq.org](https://python-rq.org/docs/workers/). +## Security + +> **Warning:** RQ uses [`pickle`](https://docs.python.org/3/library/pickle.html#module-pickle) as its default serializer, which **is not secure**. Only run RQ against Redis instances that you trust. It is possible to construct malicious pickle data that will execute arbitrary code during unpickling. + +To avoid pickle, use an alternative serializer, such as `JSONSerializer`, when enqueueing and processing jobs (see [docs](https://python-rq.org/docs/jobs/#job--queue-creation-with-custom-serializer)). JSON only supports primitive argument types (str, int, float, bool, list, dict, None). + ## Installation Simply use the following command to install the latest released version: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/docs/docs/jobs.md new/rq-2.9.1/docs/docs/jobs.md --- old/rq-2.8/docs/docs/jobs.md 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/docs/docs/jobs.md 2026-06-06 04:47:31.000000000 +0200 @@ -242,17 +242,27 @@ ## Job / Queue Creation with Custom Serializer -When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. -Serializers used should have at least `loads` and `dumps` method. -The default serializer used is `pickle`. +When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. Serializers must implement `loads` and `dumps`. The default serializer is `pickle`. + +> **Warning:** RQ uses [`pickle`](https://docs.python.org/3/library/pickle.html#module-pickle) as its default serializer, which **is not secure**. Only run RQ against Redis instances that you trust. It is possible to construct malicious pickle data that will execute arbitrary code during unpickling. + +To avoid pickle, use an alternative serializer, such as `JSONSerializer`, when enqueueing and processing jobs. JSON only supports primitive argument types (str, int, float, bool, list, dict, None). + +For example, to enqueue jobs with the JSON serializer, pass either the `'json'` shorthand or the `JSONSerializer` class itself: ```python from rq import Queue -from rq.job import Job from rq.serializers import JSONSerializer -job = Job(id="my-job", connection=connection, serializer=JSONSerializer) -queue = Queue(connection=connection, serializer=JSONSerializer) +queue = Queue(connection=connection, serializer='json') # or: serializer=JSONSerializer +job = queue.enqueue('my_module.count_words', 'https://example.com') +``` + +Then run workers with the same serializer. The `--serializer` option accepts the shorthands `json` and `pickle`, or a full dotted path such as `rq.serializers.JSONSerializer`: + +```console +$ rq worker --serializer json +$ rq worker --serializer rq.serializers.JSONSerializer ``` ## Accessing The "current" Job from within the job function diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/docs/docs/workers.md new/rq-2.9.1/docs/docs/workers.md --- old/rq-2.8/docs/docs/workers.md 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/docs/docs/workers.md 2026-06-06 04:47:31.000000000 +0200 @@ -70,7 +70,7 @@ * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--disable-job-desc-logging`: Turn off job description logging. * `--max-jobs`: Maximum number of jobs to execute. -* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer") +* `--serializer`: Serializer to use. Accepts `json` or `pickle`, or a dotted import path to your own serializer (e.g. `rq.serializers.JSONSerializer`). _New in version 1.14.0._ * `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`) @@ -208,28 +208,34 @@ ## Worker with Custom Serializer -When creating a worker, you can pass in a custom serializer that will be implicitly passed to the queue. -Serializers used should have at least `loads` and `dumps` method. An example of creating a custom serializer -class can be found in serializers.py (rq.serializers.JSONSerializer). -The default serializer used is `pickle` +When creating a worker, you can pass in a custom serializer that will be used when loading jobs from Redis. Serializers must implement `loads` and `dumps`; see `rq.serializers.JSONSerializer` for an example. The default serializer is `pickle`. + +> **Warning:** RQ uses [`pickle`](https://docs.python.org/3/library/pickle.html#module-pickle) as its default serializer, which **is not secure**. Only run RQ against Redis instances that you trust. It is possible to construct malicious pickle data that will execute arbitrary code during unpickling. + +To process jobs that were enqueued with the JSON serializer, pass either the `'json'` shorthand or the `JSONSerializer` class itself: ```python from rq import Worker from rq.serializers import JSONSerializer -job = Worker('foo', serializer=JSONSerializer) +worker = Worker('foo', serializer='json') # or: serializer=JSONSerializer ``` -or when creating from a queue +You can also use the same serializer when creating the queue object: ```python from rq import Queue, Worker from rq.serializers import JSONSerializer -w = Queue('foo', serializer=JSONSerializer) +queue = Queue('foo', serializer='json') # or: serializer=JSONSerializer +worker = Worker([queue], serializer='json') # or: serializer=JSONSerializer ``` -Queues will now use custom serializer +When starting workers from the command line, pass the serializer (the shorthand `json` resolves to `rq.serializers.JSONSerializer`; a dotted import path also works): + +```console +$ rq worker --serializer json +``` ## Better worker process title @@ -563,6 +569,6 @@ * `-n` or `--num-workers <number of worker>`: defaults to 2. * `-b` or `--burst`: run workers in burst mode (stops after all jobs in queue have been processed). * `-l` or `--logging-level <level>`: defaults to `INFO`. `DEBUG`, `WARNING`, `ERROR` and `CRITICAL` are supported. -* `-S` or `--serializer <path.to.Serializer>`: defaults to `rq.serializers.DefaultSerializer`. `rq.serializers.JSONSerializer` is also included. +* `-S` or `--serializer <serializer>`: accepts the shorthand `json` or `pickle`, or a dotted import path. Defaults to `rq.serializers.PickleSerializer`. * `-P` or `--path <path>`: multiple import paths are supported (e.g `rq worker --path foo --path bar`). * `-j` or `--job-class <path.to.Job>`: defaults to `rq.job.Job`. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/pyproject.toml new/rq-2.9.1/pyproject.toml --- old/rq-2.8/pyproject.toml 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/pyproject.toml 2026-06-06 04:47:31.000000000 +0200 @@ -12,7 +12,7 @@ { name = "Selwin Ong", email = "[email protected]" }, { name = "Vincent Driessen", email = "[email protected]" }, ] -requires-python = ">=3.9" +requires-python = ">=3.10" classifiers = [ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", @@ -26,7 +26,6 @@ "Operating System :: Unix", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -99,7 +98,7 @@ typing = "mypy rq" [tool.ruff] -target-version = "py39" +target-version = "py310" # Set what ruff should check for. # See https://beta.ruff.rs/docs/rules/ for a list of rules. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/cli/helpers.py new/rq-2.9.1/rq/cli/helpers.py --- old/rq-2.8/rq/cli/helpers.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/cli/helpers.py 2026-06-06 04:47:31.000000000 +0200 @@ -408,7 +408,7 @@ '--serializer', '-S', default=DEFAULT_SERIALIZER_CLASS, - help='Path to serializer, defaults to rq.serializers.DefaultSerializer', + help='Serializer to use. Accepts json, pickle, or a dotted import path.', ), ] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/cli/workers.py new/rq-2.9.1/rq/cli/workers.py --- old/rq-2.8/rq/cli/workers.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/cli/workers.py 2026-06-06 04:47:31.000000000 +0200 @@ -3,7 +3,6 @@ import os import sys import warnings -from typing import TYPE_CHECKING, cast import click from redis.exceptions import ConnectionError @@ -23,13 +22,10 @@ DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, ) -from rq.serializers import DefaultSerializer +from rq.serializers import resolve_serializer from rq.suspension import is_suspended from rq.worker_pool import WorkerPool -if TYPE_CHECKING: - from rq.serializers import Serializer - @main.command() @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') @@ -206,10 +202,7 @@ setup_loghandlers_from_args(verbose, quiet, date_format, log_format) - if serializer: - serializer = cast('Serializer', import_attribute(serializer)) - else: - serializer = DefaultSerializer + serializer = resolve_serializer(serializer) # if --verbose or --quiet, use the appropriate logging level if verbose: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/connections.py new/rq-2.9.1/rq/connections.py --- old/rq-2.8/rq/connections.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/connections.py 2026-06-06 04:47:31.000000000 +0200 @@ -6,8 +6,33 @@ pass +# redis-py >= 8 may add RESP3 maintenance-notification handlers and derived +# connection metadata to connection_kwargs. Those values describe the current +# pool's internal state, and some contain locks, so RQ drops them before +# rebuilding a Redis connection in another process. +REDIS_RUNTIME_CONNECTION_KWARGS = ( + 'maint_notifications_config', + 'maint_notifications_pool_handler', + 'event_dispatcher', + 'orig_host_address', + 'orig_socket_timeout', + 'orig_socket_connect_timeout', +) + + +def get_connection_kwargs(connection: Redis) -> dict: + """Return pool kwargs suitable for rebuilding this Redis connection in a child process.""" + kwargs = connection.connection_pool.connection_kwargs.copy() + for key in REDIS_RUNTIME_CONNECTION_KWARGS: + kwargs.pop(key, None) + # redis-py marks unset kwargs (e.g. socket_keepalive_options) with a sentinel object() that it + # recognizes by identity. Pickling/repr across the process boundary creates a new object(), so + # the child treats it as a real value and breaks; drop it so the child re-applies its default. + return {key: value for key, value in kwargs.items() if type(value) is not object} + + def parse_connection(connection: Redis) -> tuple[type[Redis], type[RedisConnection], dict]: - connection_pool_kwargs = connection.connection_pool.connection_kwargs.copy() + connection_pool_kwargs = get_connection_kwargs(connection) connection_pool_class = connection.connection_pool.connection_class return connection.__class__, connection_pool_class, connection_pool_kwargs diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/cron.py new/rq-2.9.1/rq/cron.py --- old/rq-2.8/rq/cron.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/cron.py 2026-06-06 04:47:31.000000000 +0200 @@ -438,7 +438,7 @@ # For now, log the error and continue # Clear the global registry after we're done - _job_data_registry = [] # type: ignore + _job_data_registry.clear() self.log.info(f"Successfully registered {job_count} cron jobs from '{config_path}'") # Method modifies the instance, no need to return self unless chaining is desired diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/job.py new/rq-2.9.1/rq/job.py --- old/rq-2.8/rq/job.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/job.py 2026-06-06 04:47:31.000000000 +0200 @@ -10,7 +10,7 @@ from collections.abc import Callable, Iterable, Mapping, Sequence from datetime import datetime, timedelta from enum import Enum -from typing import TYPE_CHECKING, Any, Optional, cast +from typing import TYPE_CHECKING, Any, cast from uuid import uuid4 from redis import WatchError @@ -248,6 +248,7 @@ description: str | None = None, depends_on: JobDependencyType | None = None, timeout: int | None = None, + retry: Retry | None = None, id: str | None = None, origin: str = '', meta: dict[str, Any] | None = None, @@ -365,6 +366,11 @@ job.meta = meta or {} job.group_id = group_id + if retry is not None: + job.retries_left = retry.max + job.enqueue_at_front_on_retry = retry.enqueue_at_front + job.retry_intervals = retry.intervals + # Process job dependencies if depends_on is not None: job.process_dependencies(depends_on) @@ -548,7 +554,7 @@ self._stopped_callback = None # After deserialization, _stopped_callback is either a callable or None, never UNEVALUATED - return cast(Optional[Callable[['Job', 'Redis'], Any]], self._stopped_callback) + return cast(Callable[['Job', 'Redis'], Any] | None, self._stopped_callback) @property def stopped_callback_timeout(self) -> int: @@ -1360,7 +1366,7 @@ self.enqueue_at_front = self.enqueue_at_front or depends_on_item.enqueue_at_front self.allow_dependency_failures = self.allow_dependency_failures or depends_on_item.allow_failure depends_on_list.extend(list(depends_on_item.dependencies)) - elif isinstance(depends_on_item, (Job, str)): + elif isinstance(depends_on_item, Job | str): depends_on_list.append(depends_on_item) else: raise ValueError( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/queue.py new/rq-2.9.1/rq/queue.py --- old/rq-2.8/rq/queue.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/queue.py 2026-06-06 04:47:31.000000000 +0200 @@ -12,7 +12,7 @@ from typing import ( TYPE_CHECKING, Any, - Optional, + NamedTuple, cast, ) @@ -72,6 +72,30 @@ __slots__ = () +class EnqueueArgs(NamedTuple): + """Helper type to use when calling Queue.parse_args""" + + func: str | Callable[..., Any] + timeout: int | str | None + description: str | None + result_ttl: int | None + ttl: int | None + failure_ttl: int | None + depends_on: JobDependencyType | None + job_id: str | None + at_front: bool + meta: dict | None + retry: Retry | None + repeat: Repeat | None + on_success: Callback | Callable | None + on_failure: Callback | Callable | None + on_stopped: Callback | Callable | None + pipeline: Pipeline | None + unique: bool + args: tuple | list | None + kwargs: dict | None + + @total_ordering class Queue: job_class: type[Job] = Job @@ -960,7 +984,7 @@ args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) - return ( + return EnqueueArgs( f, timeout, description, @@ -1244,6 +1268,7 @@ """ self.log.debug('Enqueueing job %s to queue %s (at_front=%s)', job.id, self.name, at_front) + is_deferred = job.get_status(refresh=False) == JobStatus.DEFERRED self._prepare_for_queue(job) if unique: @@ -1253,6 +1278,8 @@ else: pipe = pipeline if pipeline is not None else self.connection.pipeline() + if is_deferred: + self.deferred_job_registry.remove(job, pipeline=pipe) self._persist_job(job, pipe) self.push_job_id(job.id, pipeline=pipe, at_front=at_front) @@ -1275,6 +1302,7 @@ """ self.log.debug('Enqueueing job %s to queue %s (sync execution)', job.id, self.name) + is_deferred = job.get_status(refresh=False) == JobStatus.DEFERRED self._prepare_for_queue(job) if unique: @@ -1283,6 +1311,8 @@ else: pipe = pipeline if pipeline is not None else self.connection.pipeline() + if is_deferred: + self.deferred_job_registry.remove(job, pipeline=pipe) self._persist_job(job, pipe) if pipeline is None: @@ -1504,7 +1534,7 @@ raise DequeueTimeout(timeout, queue_key) return queue_key, result else: # non-blocking variant - result = cast(Optional[Any], connection.lmove(queue_key, intermediate_queue.key)) + result = cast(Any | None, connection.lmove(queue_key, intermediate_queue.key)) if result is not None: return queue_key, result return None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/registry.py new/rq-2.9.1/rq/registry.py --- old/rq-2.8/rq/registry.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/registry.py 2026-06-06 04:47:31.000000000 +0200 @@ -10,6 +10,7 @@ from rq.serializers import resolve_serializer +from .connections import get_connection_kwargs from .defaults import DEFAULT_FAILURE_TTL from .exceptions import AbandonedJobError, InvalidJobOperation, NoSuchJobError from .job import Job, JobStatus @@ -66,9 +67,11 @@ return self.count def __eq__(self, other): - return ( - self.name == other.name - and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs + # Compare the portable connection kwargs (stripped of per-connection runtime state such as + # redis-py 8's maintenance-notification objects), so a connection rebuilt from + # parse_connection still compares equal to the original it was derived from. + return self.name == other.name and get_connection_kwargs(self.connection) == get_connection_kwargs( + other.connection ) def __contains__(self, item: Any) -> bool: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/scripts.py new/rq-2.9.1/rq/scripts.py --- old/rq-2.8/rq/scripts.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/scripts.py 2026-06-06 04:47:31.000000000 +0200 @@ -2,6 +2,7 @@ import logging import time from datetime import timedelta, timezone +from typing import Any from .exceptions import DuplicateJobError from .logutils import blue, green @@ -44,7 +45,7 @@ """ # Cache registered scripts per connection -_registered_scripts = {} +_registered_scripts: dict[Any, Any] = {} def get_unique_enqueue_script(connection): @@ -150,7 +151,7 @@ return hset_args -_registered_schedule_scripts = {} +_registered_schedule_scripts: dict[Any, Any] = {} def get_unique_schedule_script(connection): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/serializers.py new/rq-2.9.1/rq/serializers.py --- old/rq-2.8/rq/serializers.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/serializers.py 2026-06-06 04:47:31.000000000 +0200 @@ -4,7 +4,7 @@ import pickle from collections.abc import Callable from functools import partial -from typing import Any, ClassVar, Protocol, runtime_checkable +from typing import Any, ClassVar, Protocol, cast, runtime_checkable from .utils import import_attribute @@ -21,6 +21,9 @@ loads: ClassVar[Callable[[bytes], Any]] = pickle.loads +PickleSerializer = DefaultSerializer + + class JSONSerializer: @staticmethod def dumps(*args, **kwargs): @@ -31,6 +34,12 @@ return json.loads(s.decode('utf-8'), *args, **kwargs) +SERIALIZER_ALIASES: dict[str, Serializer] = { + 'json': JSONSerializer, + 'pickle': PickleSerializer, +} + + def resolve_serializer(serializer: Serializer | str | None = None) -> Serializer: """This function checks the user defined serializer for ('dumps', 'loads') methods It returns a default pickle serializer if not found else it returns a MySerializer @@ -44,12 +53,13 @@ serializer (Callable): An object that implements the SerializerProtocol """ if not serializer: - return DefaultSerializer + return PickleSerializer if isinstance(serializer, str): - serializer = import_attribute(serializer) # type: ignore[assignment] - - assert not isinstance(serializer, str) + serializer_path = serializer + serializer = SERIALIZER_ALIASES.get(serializer_path) + if not serializer: + serializer = cast(Serializer, import_attribute(serializer_path)) if not isinstance(serializer, Serializer): raise NotImplementedError('Serializer should have (dumps, loads) methods.') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/types.py new/rq-2.9.1/rq/types.py --- old/rq-2.8/rq/types.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/types.py 2026-06-06 04:47:31.000000000 +0200 @@ -2,7 +2,7 @@ from collections.abc import Callable from types import TracebackType -from typing import TYPE_CHECKING, Any, Optional, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar if TYPE_CHECKING: from typing import TypeAlias @@ -27,5 +27,5 @@ SuccessCallbackType = Callable[['Job', 'Redis', Any], Any] FailureCallbackType = Callable[ - ['Job', 'Redis', Optional[type[BaseException]], Optional[BaseException], Optional[TracebackType]], Any + ['Job', 'Redis', type[BaseException] | None, BaseException | None, TracebackType | None], Any ] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/version.py new/rq-2.9.1/rq/version.py --- old/rq-2.8/rq/version.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/version.py 2026-06-06 04:47:31.000000000 +0200 @@ -1 +1 @@ -VERSION = '2.8.0' +VERSION = '2.9.1' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/worker/base.py new/rq-2.9.1/rq/worker/base.py --- old/rq-2.8/rq/worker/base.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/worker/base.py 2026-06-06 04:47:31.000000000 +0200 @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import logging import math import os @@ -177,6 +178,12 @@ self.version: str = VERSION self.python_version: str = sys.version + if serializer is None or isinstance(serializer, str): + self._serializer_arg: str | None = serializer + elif inspect.ismodule(serializer): + self._serializer_arg = serializer.__name__ + else: + self._serializer_arg = f'{serializer.__module__}.{serializer.__qualname__}' # type: ignore[attr-defined] self.serializer = resolve_serializer(serializer) self.execution: Execution | None = None @@ -216,7 +223,8 @@ self.failed_job_count: int = 0 self.total_working_time: float = 0 self.current_job_working_time: float = 0 - self.birth_date = None + self.birth_date: datetime | None = None + self.last_heartbeat: datetime | None = None self.scheduler: RQScheduler | None = None self.pubsub: PubSub | None = None self.pubsub_thread = None @@ -233,7 +241,7 @@ self.pid = None self.ip_address = 'unknown' - if isinstance(exception_handlers, (list, tuple)): + if isinstance(exception_handlers, list | tuple): for handler in exception_handlers: self.push_exc_handler(handler) elif exception_handlers is not None: @@ -872,35 +880,34 @@ else: self.scheduler.start() + def serialize(self) -> dict: + assert self.birth_date is not None and self.last_heartbeat is not None + return { + 'birth': utcformat(self.birth_date), + 'last_heartbeat': utcformat(self.last_heartbeat), + 'queues': ','.join(self.queue_names()), + 'pid': self.pid, + 'hostname': self.hostname, + 'ip_address': self.ip_address, + 'version': self.version, + 'python_version': self.python_version, + } + def register_birth(self): """Registers its own birth.""" self.log.debug('Worker %s: registering birth', self.name) - if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'): + key = self.key + if self.connection.exists(key) and not self.connection.hexists(key, 'death'): msg = 'There exists an active worker named {0!r} already' raise ValueError(msg.format(self.name)) - key = self.key - queues = ','.join(self.queue_names()) - with self.connection.pipeline() as p: - p.delete(key) - right_now = now() - now_in_string = utcformat(right_now) - self.birth_date = right_now - - mapping = { - 'birth': now_in_string, - 'last_heartbeat': now_in_string, - 'queues': queues, - 'pid': self.pid, - 'hostname': self.hostname, - 'ip_address': self.ip_address, - 'version': self.version, - 'python_version': self.python_version, - } - - p.hset(key, mapping=mapping) - worker_registration.register(self, p) - p.expire(key, self.worker_ttl + 60) - p.execute() + with self.connection.pipeline() as pipeline: + pipeline.delete(key) + + self.birth_date = self.last_heartbeat = now() + pipeline.hset(key, mapping=self.serialize()) + worker_registration.register(self, pipeline) + pipeline.expire(key, self.worker_ttl + 60) + pipeline.execute() def register_death(self): """Registers its own death.""" @@ -1159,8 +1166,9 @@ """ timeout = timeout or self.worker_ttl + 60 connection: Redis | Pipeline = pipeline if pipeline is not None else self.connection + self.last_heartbeat = now() + connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) connection.expire(self.key, timeout) - connection.hset(self.key, 'last_heartbeat', utcformat(now())) self.log.debug( 'Worker %s: sent heartbeat to prevent worker timeout. Next one should arrive in %s seconds.', self.name, @@ -1175,7 +1183,9 @@ # Also need to update execution's heartbeat self.execution.heartbeat(job.started_job_registry, ttl, pipeline=pipeline) # type: ignore + # After transition to job execution is complete, `job.heartbeat()` is no longer needed + job_heartbeat_index = len(pipeline) job.heartbeat(now(), ttl, pipeline=pipeline, xx=True) results = pipeline.execute() @@ -1188,9 +1198,15 @@ # heartbeat() command. If a new key was created, this means the job was already # deleted. In this case, we simply send another delete command to remove the key. # https://github.com/rq/rq/issues/1450 + if results[job_heartbeat_index] == 1: + pipeline.delete(job.key) + + # like above, check if the worker's hash expired before `self.heartbeat` was able to + # update the expiration + if results[0] == 1: + pipeline.hset(self.key, mapping=self.serialize()) - if results[7] == 1: - self.connection.delete(job.key) + pipeline.execute() def teardown(self): if not self.is_horse: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/worker/worker_classes.py new/rq-2.9.1/rq/worker/worker_classes.py --- old/rq-2.8/rq/worker/worker_classes.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/worker/worker_classes.py 2026-06-06 04:47:31.000000000 +0200 @@ -9,6 +9,7 @@ from random import shuffle from typing import TYPE_CHECKING +from ..connections import get_connection_kwargs from ..defaults import DEFAULT_WORKER_TTL from ..exceptions import InvalidJobOperation, ShutDownImminentException from ..job import Job, JobStatus @@ -166,7 +167,7 @@ os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_EXECUTION_ID'] = self.execution.id # type: ignore - redis_kwargs = self.connection.connection_pool.connection_kwargs.copy() + redis_kwargs = get_connection_kwargs(self.connection) if redis_kwargs.get('retry'): # Remove retry from connection kwargs to avoid issues with os.spawnv del redis_kwargs['retry'] @@ -188,14 +189,14 @@ from rq.executions import Execution # Recreate worker instance -redis = Redis(**{redis_kwargs}) -worker = Worker.find_by_key("{self.key}", connection=redis) +redis = Redis(**{redis_kwargs!r}) +worker = Worker.find_by_key({self.key!r}, connection=redis, serializer={self._serializer_arg!r}) if not worker: sys.exit(1) # Reconstruct job, queue and execution objects -job = Job.fetch("{job.id}", connection=worker.connection) -queue = Queue("{queue.name}", connection=worker.connection) +job = Job.fetch({job.id!r}, connection=worker.connection, serializer=worker.serializer) +queue = Queue({queue.name!r}, connection=worker.connection, serializer=worker.serializer) execution_id = os.environ["RQ_EXECUTION_ID"] worker.execution = Execution.fetch(execution_id, job.id, connection=worker.connection) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/rq/worker_registration.py new/rq-2.9.1/rq/worker_registration.py --- old/rq-2.8/rq/worker_registration.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/rq/worker_registration.py 2026-06-06 04:47:31.000000000 +0200 @@ -74,7 +74,8 @@ redis = queue.connection redis_key = WORKERS_BY_QUEUE_KEY % queue.name else: - redis = connection # type: ignore + assert connection is not None + redis = connection redis_key = REDIS_WORKER_KEYS return {as_text(key) for key in redis.smembers(redis_key)} diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/Dockerfile new/rq-2.9.1/tests/Dockerfile --- old/rq-2.8/tests/Dockerfile 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/Dockerfile 2026-06-06 04:47:31.000000000 +0200 @@ -25,17 +25,8 @@ redis-server \ python3-pip \ stunnel \ - python3.9 \ - python3.9-dev \ - python3.9-distutils \ python3.10 \ - python3.10-dev \ - python3.11 \ - python3.11-dev \ - python3.12 \ - python3.12-dev \ - python3.13 \ - python3.13-dev + python3.10-dev RUN apt-get clean && \ rm -rf /var/lib/apt/lists/* diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/fixtures.py new/rq-2.9.1/tests/fixtures.py --- old/rq-2.8/tests/fixtures.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/fixtures.py 2026-06-06 04:47:31.000000000 +0200 @@ -16,6 +16,7 @@ from rq import Queue, get_current_job from rq.command import send_kill_horse_command, send_shutdown_command +from rq.connections import get_connection_kwargs from rq.defaults import DEFAULT_JOB_MONITORING_INTERVAL from rq.job import Job from rq.suspension import resume @@ -280,7 +281,7 @@ """ Use multiprocessing to start a new worker in a separate process. """ - conn_kwargs = connection.connection_pool.connection_kwargs + conn_kwargs = get_connection_kwargs(connection) p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst, job_monitoring_interval)) p.start() return p diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_cli.py new/rq-2.9.1/tests/test_cli.py --- old/rq-2.8/tests/test_cli.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_cli.py 2026-06-06 04:47:31.000000000 +0200 @@ -294,6 +294,25 @@ result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assert_normal_execution(result) + def test_worker_and_worker_pool_serializer_json_alias(self): + """rq worker/worker-pool -u <url> -b --serializer json""" + runner = CliRunner() + + queue = Queue('worker-json', connection=self.connection, serializer=JSONSerializer) + job = queue.enqueue(say_hello, 'Hello') + result = runner.invoke(main, ['worker', 'worker-json', '-u', self.redis_url, '-b', '--serializer', 'json']) + self.assert_normal_execution(result) + self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED) + + queue = Queue('worker-pool-json', connection=self.connection, serializer=JSONSerializer) + job = queue.enqueue(say_hello, 'Hello') + result = runner.invoke( + main, + ['worker-pool', 'worker-pool-json', '-u', self.redis_url, '-b', '--serializer', 'json'], + ) + self.assert_normal_execution(result) + self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED) + def test_worker_pid(self): """rq worker -u <url> /tmp/..""" pid = self.tmpdir.join('rq.pid') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_commands.py new/rq-2.9.1/tests/test_commands.py --- old/rq-2.8/tests/test_commands.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_commands.py 2026-06-06 04:47:31.000000000 +0200 @@ -7,6 +7,7 @@ from rq import Queue, Worker from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command +from rq.connections import get_connection_kwargs from rq.exceptions import InvalidJobOperation, NoSuchJobError from rq.serializers import JSONSerializer from rq.worker import WorkerStatus @@ -30,9 +31,7 @@ connection = self.connection worker = Worker('foo', connection=connection) - p = Process( - target=_send_shutdown_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy()) - ) + p = Process(target=_send_shutdown_command, args=(worker.name, get_connection_kwargs(connection))) p.start() worker.work() p.join(1) @@ -73,16 +72,14 @@ job = queue.enqueue(long_running_job, 4) worker = Worker('foo', connection=connection) - p = Process( - target=_send_kill_horse_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy()) - ) + p = Process(target=_send_kill_horse_command, args=(worker.name, get_connection_kwargs(connection))) p.start() worker.work(burst=True) p.join(1) job.refresh() self.assertIn(job.id, queue.failed_job_registry) - p = Process(target=start_work, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy())) + p = Process(target=start_work, args=('foo', worker.name, get_connection_kwargs(connection))) p.start() p.join(2) @@ -108,9 +105,7 @@ with self.assertRaises(NoSuchJobError): send_stop_job_command(connection, job_id='1', serializer=JSONSerializer) - p = Process( - target=start_work_burst, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy()) - ) + p = Process(target=start_work_burst, args=('foo', worker.name, get_connection_kwargs(connection))) p.start() p.join(1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_cron.py new/rq-2.9.1/tests/test_cron.py --- old/rq-2.8/tests/test_cron.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_cron.py 2026-06-06 04:47:31.000000000 +0200 @@ -12,6 +12,7 @@ from redis import Redis from rq import Queue, utils +from rq.connections import get_connection_kwargs from rq.cron import CronJob, CronScheduler, _job_data_registry from rq.cron_scheduler_registry import get_keys, get_registry_key from rq.exceptions import SchedulerNotFound @@ -782,31 +783,44 @@ def test_sigint_handling(self): """Test that sending SIGINT to the process stops the scheduler""" - conn_kwargs = self.connection.connection_pool.connection_kwargs + conn_kwargs = get_connection_kwargs(self.connection) scheduler_process = Process(target=run_scheduler, args=(conn_kwargs,)) scheduler_process.start() assert scheduler_process.pid - time.sleep(0.2) + # Ensure scheduler is registered (name will have random suffix) scheduler_prefix = f'{socket.gethostname()}:{scheduler_process.pid}:' - # Find scheduler with matching prefix - matching_scheduler = None - for key in get_keys(self.connection): - if key.startswith(scheduler_prefix): - matching_scheduler = key - break - - self.assertTrue(matching_scheduler) - - os.kill(scheduler_process.pid, signal.SIGINT) - - scheduler_process.join(timeout=2) - self.assertFalse(scheduler_process.is_alive()) - - # Verify scheduler is no longer registered - keys = get_keys(self.connection) - self.assertEqual([key for key in keys if key.startswith(scheduler_prefix)], []) + try: + # Find scheduler with matching prefix + deadline = time.time() + 5 + matching_scheduler = None + keys = [] + while time.time() < deadline and scheduler_process.is_alive(): + keys = get_keys(self.connection) + matching_scheduler = next((key for key in keys if key.startswith(scheduler_prefix)), None) + if matching_scheduler: + break + time.sleep(0.05) + + self.assertTrue( + matching_scheduler, + f'Cron scheduler {scheduler_prefix} was not registered. ' + f'Process exitcode: {scheduler_process.exitcode}. Registered schedulers: {keys}', + ) + + os.kill(scheduler_process.pid, signal.SIGINT) + + scheduler_process.join(timeout=2) + self.assertFalse(scheduler_process.is_alive()) + + # Verify scheduler is no longer registered + keys = get_keys(self.connection) + self.assertEqual([key for key in keys if key.startswith(scheduler_prefix)], []) + finally: + if scheduler_process.is_alive(): + scheduler_process.terminate() + scheduler_process.join(timeout=2) def test_last_heartbeat_property(self): """Test that last_heartbeat property works correctly in all scenarios""" diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_dependencies.py new/rq-2.9.1/tests/test_dependencies.py --- old/rq-2.8/tests/test_dependencies.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_dependencies.py 2026-06-06 04:47:31.000000000 +0200 @@ -1,6 +1,7 @@ from multiprocessing import Process from rq import Queue, SimpleWorker, Worker +from rq.connections import get_connection_kwargs from rq.job import Dependency, Job, JobStatus from rq.utils import current_timestamp from tests import RQTestCase @@ -232,9 +233,9 @@ job_c = queue.enqueue(check_dependencies_are_met, job_id='C', depends_on=['A', 'B']) job_c.dependencies_are_met() - w = Worker([queue], connection=self.connection) + w = SimpleWorker([queue], connection=self.connection) w.work(burst=True) - assert job_c.result + assert job_c.return_value(refresh=True) def test_allow_failures_when_work_horse_killed(self): """Ensure that allow_failure is respected when a worker is killed""" @@ -243,7 +244,7 @@ job2 = queue.enqueue(say_hello, depends_on=Dependency(jobs=job, allow_failure=True)) # Wait 1 second before killing the horse to simulate horse terminating unexpectedly - p = Process(target=kill_horse, args=('horse_pid_key', self.connection.connection_pool.connection_kwargs, 1)) + p = Process(target=kill_horse, args=('horse_pid_key', get_connection_kwargs(self.connection), 1)) p.start() worker = Worker([queue], connection=self.connection) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_fixtures.py new/rq-2.9.1/tests/test_fixtures.py --- old/rq-2.8/tests/test_fixtures.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_fixtures.py 2026-06-06 04:47:31.000000000 +0200 @@ -1,16 +1,17 @@ from rq import Queue +from rq.connections import get_connection_kwargs from tests import RQTestCase, fixtures class TestFixtures(RQTestCase): def test_rpush_fixture(self): - connection_kwargs = self.connection.connection_pool.connection_kwargs + connection_kwargs = get_connection_kwargs(self.connection) fixtures.rpush('foo', 'bar', connection_kwargs) assert self.connection.lrange('foo', 0, 0)[0].decode() == 'bar' def test_start_worker_fixture(self): queue = Queue(name='testing', connection=self.connection) queue.enqueue(fixtures.say_hello) - conn_kwargs = self.connection.connection_pool.connection_kwargs + conn_kwargs = get_connection_kwargs(self.connection) fixtures.start_worker(queue.name, conn_kwargs, 'w1', True) assert not queue.jobs diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_job_dependency.py new/rq-2.9.1/tests/test_job_dependency.py --- old/rq-2.8/tests/test_job_dependency.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_job_dependency.py 2026-06-06 04:47:31.000000000 +0200 @@ -3,6 +3,7 @@ from redis import WatchError +from rq.connections import get_connection_kwargs from rq.job import Dependency, Job, JobStatus, cancel_job from rq.queue import Queue from rq.registry import ( @@ -446,7 +447,7 @@ queue = Queue(connection=self.connection) key = 'test_job:job_order' - connection_kwargs = self.connection.connection_pool.connection_kwargs + connection_kwargs = get_connection_kwargs(self.connection) # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. # Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs. job_slow = queue.enqueue(fixtures.rpush, args=[key, 'slow', connection_kwargs, True, 0.5], job_id='slow_job') @@ -476,7 +477,7 @@ """Test that jobs with dependencies are executed in the correct order.""" queue = Queue(connection=self.connection) key = 'test_job:job_order' - connection_kwargs = self.connection.connection_pool.connection_kwargs + connection_kwargs = get_connection_kwargs(self.connection) # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, 'slow_1', connection_kwargs, True, 0.5], job_id='slow_1') job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, 'slow_2', connection_kwargs, True, 0.75], job_id='slow_2') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_queue.py new/rq-2.9.1/tests/test_queue.py --- old/rq-2.8/tests/test_queue.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_queue.py 2026-06-06 04:47:31.000000000 +0200 @@ -44,11 +44,11 @@ def test_create_queue_with_serializer(self): """Creating queues with serializer.""" - # Test using json serializer q = Queue('queue-with-serializer', connection=self.connection, serializer=json) - self.assertEqual(q.name, 'queue-with-serializer') - self.assertEqual(str(q), '<Queue queue-with-serializer>') - self.assertIsNotNone(q.serializer) + self.assertIs(q.serializer, json) + + q = Queue('queue-with-serializer', connection=self.connection, serializer='json') + self.assertIs(q.serializer, JSONSerializer) def test_create_default_queue(self): """Instantiating the default queue.""" @@ -561,6 +561,19 @@ self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.get_status(), JobStatus.QUEUED) + def test_enqueue_deferred_job_removes_it_from_deferred_registry(self): + """Enqueueing a deferred job removes it from DeferredJobRegistry.""" + q = Queue(connection=self.connection) + job = Job.create(func=say_hello, connection=self.connection, origin=q.name, status=JobStatus.DEFERRED) + job.save() + q.deferred_job_registry.add(job) + + q._enqueue_job(job) + + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertNotIn(job, q.deferred_job_registry) + def test_enqueue_job_with_dependency_and_pipeline(self): """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline.""" # Job with unfinished dependency is not immediately enqueued diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_retry.py new/rq-2.9.1/tests/test_retry.py --- old/rq-2.8/tests/test_retry.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_retry.py 2026-06-06 04:47:31.000000000 +0200 @@ -134,6 +134,18 @@ # status should be queued self.assertEqual(job.get_status(), JobStatus.QUEUED) + def test_job_create_with_retry(self): + """Job.create(..., retry=...) works properly""" + queue = Queue(connection=self.connection) + + retry = Retry(max=3, interval=5, enqueue_at_front=True) + job = Job.create(div_by_zero, retry=retry, connection=self.connection) + queue.enqueue_job(job) + + self.assertEqual(job.retries_left, 3) + self.assertEqual(job.retry_intervals, [5]) + self.assertTrue(job.enqueue_at_front_on_retry) + def test_retry_interval(self): """Retries with intervals are scheduled""" connection = self.connection diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_serializers.py new/rq-2.9.1/tests/test_serializers.py --- old/rq-2.8/tests/test_serializers.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_serializers.py 2026-06-06 04:47:31.000000000 +0200 @@ -4,7 +4,7 @@ import queue import unittest -from rq.serializers import DefaultSerializer, resolve_serializer +from rq.serializers import DefaultSerializer, JSONSerializer, PickleSerializer, resolve_serializer class TestSerializers(unittest.TestCase): @@ -12,9 +12,10 @@ """Ensure function resolve_serializer works correctly""" serializer = resolve_serializer(None) self.assertIsNotNone(serializer) - self.assertEqual(serializer, DefaultSerializer) + self.assertEqual(serializer, PickleSerializer) + self.assertIs(DefaultSerializer, PickleSerializer) - # Test round trip with default serializer + # Test round trip with pickle serializer test_data = {'test': 'data'} serialized_data = serializer.dumps(test_data) self.assertEqual(serializer.loads(serialized_data), test_data) @@ -38,3 +39,9 @@ # Test using path.to.serializer string serializer = resolve_serializer('tests.fixtures.Serializer') self.assertIsNotNone(serializer) + + # Shorthand aliases + self.assertIs(resolve_serializer('json'), JSONSerializer) + self.assertIs(resolve_serializer('pickle'), PickleSerializer) + self.assertIs(resolve_serializer('rq.serializers.PickleSerializer'), PickleSerializer) + self.assertIs(resolve_serializer('rq.serializers.DefaultSerializer'), PickleSerializer) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_spawn_worker.py new/rq-2.9.1/tests/test_spawn_worker.py --- old/rq-2.8/tests/test_spawn_worker.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_spawn_worker.py 2026-06-06 04:47:31.000000000 +0200 @@ -4,10 +4,14 @@ from datetime import timezone from multiprocessing import Process +from redis import Redis + from rq import Queue +from rq.connections import get_connection_kwargs from rq.job import Job from rq.registry import FailedJobRegistry, FinishedJobRegistry from rq.results import Result +from rq.serializers import JSONSerializer from rq.worker import SpawnWorker from tests import RQTestCase, slow from tests.fixtures import ( @@ -35,24 +39,44 @@ self.assertEqual(registry.get_job_ids(), []) def test_filters_non_serializable_connection_kwargs(self): - """SpawnWorker filters out non-serializable kwargs like driver_info before spawning.""" + """SpawnWorker strips connection-local runtime objects before rebuilding Redis in the child. + + redis-py 8 adds an unpicklable maintenance-notification handler to connection_kwargs; + get_connection_kwargs (used by fork_work_horse) must drop it so the + kwargs can be rebuilt in the spawned process. + """ + connection = Redis() + conn_kwargs = connection.connection_pool.connection_kwargs + conn_kwargs['maint_notifications_pool_handler'] = object() + + redis_kwargs = get_connection_kwargs(connection) + + self.assertNotIn('maint_notifications_pool_handler', redis_kwargs) + self.assertIn('maint_notifications_pool_handler', conn_kwargs) + + def test_worker_normalizes_serializer_arg_for_spawn(self): + """_serializer_arg is normalized to str|None so it can be safely embedded in the child source.""" + import json + + from rq.serializers import PickleSerializer, resolve_serializer + queue = Queue('foo', connection=self.connection) - worker = SpawnWorker([queue]) - # Inject a non-serializable driver_info into connection kwargs - conn_kwargs = worker.connection.connection_pool.connection_kwargs - conn_kwargs['driver_info'] = object() - self.addCleanup(conn_kwargs.pop, 'driver_info', None) - - # SpawnWorker should sanitize a copy before building the script. - redis_kwargs = worker.connection.connection_pool.connection_kwargs.copy() - if redis_kwargs.get('retry'): - del redis_kwargs['retry'] - if redis_kwargs.get('driver_info'): - del redis_kwargs['driver_info'] + worker = SpawnWorker([queue]) + self.assertIsNone(worker._serializer_arg) + self.assertIs(resolve_serializer(worker._serializer_arg), PickleSerializer) - self.assertNotIn('driver_info', redis_kwargs) - self.assertIn('driver_info', worker.connection.connection_pool.connection_kwargs) + worker = SpawnWorker([queue], serializer='json') + self.assertEqual(worker._serializer_arg, 'json') + self.assertIs(resolve_serializer(worker._serializer_arg), JSONSerializer) + + worker = SpawnWorker([queue], serializer=JSONSerializer) + self.assertEqual(worker._serializer_arg, 'rq.serializers.JSONSerializer') + self.assertIs(resolve_serializer(worker._serializer_arg), JSONSerializer) + + worker = SpawnWorker([queue], serializer=json) + self.assertEqual(worker._serializer_arg, 'json') + self.assertIs(resolve_serializer(worker._serializer_arg), JSONSerializer) def test_invalid_job_id_is_rejected_before_spawn(self): queue = Queue('foo', connection=self.connection) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_worker.py new/rq-2.9.1/tests/test_worker.py --- old/rq-2.8/tests/test_worker.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_worker.py 2026-06-06 04:47:31.000000000 +0200 @@ -19,6 +19,7 @@ import redis.exceptions from rq import Queue, SimpleWorker, Worker +from rq.connections import get_connection_kwargs from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_WORKER_TTL from rq.job import Job, JobStatus, Retry from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry @@ -88,6 +89,10 @@ w = Worker(Queue('foo', connection=self.connection), serializer=json) self.assertEqual(w.queues[0].name, 'foo') + # With queue name string and serializer alias + w = Worker('foo', serializer='json', connection=self.connection) + self.assertIs(w.serializer, JSONSerializer) + def test_work_and_quit(self): """Worker processes work, then quits.""" fooq, barq = Queue('foo', connection=self.connection), Queue('bar', connection=self.connection) @@ -1035,7 +1040,7 @@ # Suspend the worker, and then send resume command in the background q.enqueue(say_hello) - p = Process(target=resume_worker, args=(self.connection.connection_pool.connection_kwargs.copy(), 2)) + p = Process(target=resume_worker, args=(get_connection_kwargs(self.connection), 2)) p.start() w.worker_ttl = 1 w.work(max_jobs=1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tests/test_worker_pool.py new/rq-2.9.1/tests/test_worker_pool.py --- old/rq-2.8/tests/test_worker_pool.py 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tests/test_worker_pool.py 2026-06-06 04:47:31.000000000 +0200 @@ -3,7 +3,7 @@ from multiprocessing import Process from time import sleep -from rq.connections import parse_connection +from rq.connections import get_connection_kwargs, parse_connection from rq.job import JobStatus from rq.queue import Queue from rq.serializers import JSONSerializer @@ -44,7 +44,7 @@ worker_data = list(pool.worker_dict.values())[0] sleep(0.5) - _send_shutdown_command(worker_data.name, self.connection.connection_pool.connection_kwargs.copy(), delay=0) + _send_shutdown_command(worker_data.name, get_connection_kwargs(self.connection), delay=0) # 1 worker should be dead since we sent a shutdown command sleep(0.75) pool.check_workers(respawn=False) @@ -67,7 +67,7 @@ worker_data = list(pool.worker_dict.values())[0] sleep(0.5) - _send_shutdown_command(worker_data.name, self.connection.connection_pool.connection_kwargs.copy(), delay=0) + _send_shutdown_command(worker_data.name, get_connection_kwargs(self.connection), delay=0) # 1 worker should be dead since we sent a shutdown command sleep(0.75) pool.reap_workers() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rq-2.8/tox.ini new/rq-2.9.1/tox.ini --- old/rq-2.8/tox.ini 2026-04-17 02:19:17.000000000 +0200 +++ new/rq-2.9.1/tox.ini 2026-06-06 04:47:31.000000000 +0200 @@ -1,6 +1,6 @@ [tox] isolated_build = True -envlist=py39,py310,py311,py312,py313 +envlist=py310,py311,py312,py313 [testenv] commands=pytest --cov rq --cov-config=.coveragerc --durations=5 {posargs} @@ -19,11 +19,6 @@ ; commands = ; ruff check rq tests -[testenv:py39] -skipdist = True -basepython = python3.9 -deps = {[testenv]deps} - [testenv:py310] skipdist = True basepython = python3.10
