changeset de73634a21f1 in trytond:6.0
details: https://hg.tryton.org/trytond?cmd=changeset&node=de73634a21f1
description:
Initialize the pool before running queue or cron tasks
issue11400
review384981002
(grafted from b52363a1d96234cf8cf0f955a12752c5a22f6508)
diffstat:
trytond/cron.py | 5 +++++
trytond/worker.py | 28 ++++++++++++++++++----------
2 files changed, 23 insertions(+), 10 deletions(-)
diffs (94 lines):
diff -r 2fa2c08bfde2 -r de73634a21f1 trytond/cron.py
--- a/trytond/cron.py Mon Apr 18 19:40:01 2022 +0200
+++ b/trytond/cron.py Tue Apr 19 13:21:42 2022 +0200
@@ -5,6 +5,7 @@
import logging
from trytond.pool import Pool
+from trytond.transaction import Transaction
__all__ = ['run']
logger = logging.getLogger(__name__)
@@ -27,7 +28,11 @@
logger.info(
'skip "%s" as previous cron still running', db_name)
continue
+ database_list = Pool.database_list()
pool = Pool(db_name)
+ if db_name not in database_list:
+ with Transaction().start(db_name, 0, readonly=True):
+ pool.init()
Cron = pool.get('ir.cron')
thread = threading.Thread(
target=Cron.run,
diff -r 2fa2c08bfde2 -r de73634a21f1 trytond/worker.py
--- a/trytond/worker.py Mon Apr 18 19:40:01 2022 +0200
+++ b/trytond/worker.py Tue Apr 19 13:21:42 2022 +0200
@@ -21,19 +21,22 @@
class Queue(object):
- def __init__(self, pool, mpool):
- self.database = backend.Database(pool.database_name).connect()
+ def __init__(self, database_name, mpool):
+ self.database = backend.Database(database_name).connect()
self.connection = self.database.get_connection(autocommit=True)
- self.pool = pool
self.mpool = mpool
def pull(self, name=None):
- Queue = self.pool.get('ir.queue')
+ database_list = Pool.database_list()
+ pool = Pool(self.database.name)
+ if self.database.name not in database_list:
+ with Transaction().start(self.database.name, 0, readonly=True):
+ pool.init()
+ Queue = pool.get('ir.queue')
return Queue.pull(self.database, self.connection, name=name)
def run(self, task_id):
- return self.mpool.apply_async(
- run_task, (self.pool.database_name, task_id))
+ return self.mpool.apply_async(run_task, (self.database.name, task_id))
class TaskList(list):
@@ -54,8 +57,9 @@
processes = 1
logger.info("start %d workers", processes)
mpool = MPool(
- processes, initializer, (options,), options.maxtasksperchild)
- queues = [Queue(pool, mpool) for pool in initializer(options, False)]
+ processes, initializer, (options.database_names,),
+ options.maxtasksperchild)
+ queues = [Queue(name, mpool) for name in options.database_names]
tasks = TaskList()
timeout = options.timeout
@@ -81,12 +85,12 @@
mpool.close()
-def initializer(options, worker=True):
+def initializer(database_names, worker=True):
if worker:
signal.signal(signal.SIGINT, signal.SIG_IGN)
pools = []
database_list = Pool.database_list()
- for database_name in options.database_names:
+ for database_name in database_names:
pool = Pool(database_name)
if database_name not in database_list:
with Transaction().start(database_name, 0, readonly=True):
@@ -97,7 +101,11 @@
def run_task(pool, task_id):
if not isinstance(pool, Pool):
+ database_list = Pool.database_list()
pool = Pool(pool)
+ if pool.database_name not in database_list:
+ with Transaction().start(pool.database_name, 0, readonly=True):
+ pool.init()
Queue = pool.get('ir.queue')
name = '<Task %s@%s>' % (task_id, pool.database_name)
logger.info('%s started', name)