Previously, if backend started a bit faster than job-grabber, it
was fine, but the other way around (because job-grabber is
prerequisite for backend in service file):

  1. Job Grabber started
  2. JobGrabber took the list of tasks
  3. JobGrabber filled the redis task queue && and filled its
     internal representation of queue
  4. Backend started, which resulted in redis task queue cleanup
  5. JobGrabber was confused because the filled redis queue did
     not match its internal queue representation state

This resulted in unprocessed queue.

Now backend precisely tells job_grabber via "different" (Queue)
channel about backend re-start (so backend does not have to touch
the Queue itself and the inconsistency does not happen).
---
 backend/backend/daemons/backend.py    | 32 ++----------
 backend/backend/daemons/dispatcher.py | 17 +++---
 backend/backend/daemons/job_grab.py   | 97 ++++++++++++++++++++++-------------
 backend/backend/helpers.py            | 13 +++++
 backend/backend/jobgrabcontrol.py     | 80 +++++++++++++++++++++++++++++
 5 files changed, 166 insertions(+), 73 deletions(-)
 create mode 100644 backend/backend/jobgrabcontrol.py

diff --git a/backend/backend/daemons/backend.py 
b/backend/backend/daemons/backend.py
index 934a566..8b54bf1 100644
--- a/backend/backend/daemons/backend.py
+++ b/backend/backend/daemons/backend.py
@@ -15,13 +15,13 @@ from collections import defaultdict
 import lockfile
 from daemon import DaemonContext
 from requests import RequestException
-from retask.queue import Queue
 from retask import ConnectionError
 from backend.frontend import FrontendClient
 
 from ..exceptions import CoprBackendError
 from ..helpers import BackendConfigReader, get_redis_logger
 from .dispatcher import Worker
+from .. import jobgrabcontrol
 
 
 class CoprBackend(object):
@@ -42,7 +42,7 @@ class CoprBackend(object):
             raise CoprBackendError("Must specify config_file")
 
         self.config_file = config_file
-        self.ext_opts = ext_opts  # to stow our cli options for read_conf()
+        self.ext_opts = ext_opts  # to show our cli options for read_conf()
         self.workers_by_group_id = defaultdict(list)
         self.max_worker_num_by_group_id = defaultdict(int)
 
@@ -55,35 +55,14 @@ class CoprBackend(object):
         self.log = get_redis_logger(self.opts, "backend.main", "backend")
 
         self.frontend_client = FrontendClient(self.opts, self.log)
+        self.jg_control = jobgrabcontrol.Channel(self.opts, self.log)
         self.is_running = False
 
-    def clean_task_queues(self):
-        """
-        Make sure there is nothing in our task queues
-        """
-        try:
-            for queue in self.task_queues.values():
-                while queue.length:
-                    queue.dequeue()
-        except ConnectionError:
-            raise CoprBackendError(
-                "Could not connect to a task queue. Is Redis running?")
-
     def init_task_queues(self):
         """
-        Connect to the retask.Queue for each group_id. Remove old tasks from 
queues.
+        Remove old tasks from queues.
         """
-        try:
-            for group in self.opts.build_groups:
-                group_id = group["id"]
-                queue = Queue("copr-be-{0}".format(group_id))
-                queue.connect()
-                self.task_queues[group_id] = queue
-        except ConnectionError:
-            raise CoprBackendError(
-                "Could not connect to a task queue. Is Redis running?")
-
-        self.clean_task_queues()
+        self.jg_control.backend_start()
 
     def update_conf(self):
         """
@@ -158,7 +137,6 @@ class CoprBackend(object):
             for w in self.workers_by_group_id[group_id][:]:
                 self.workers_by_group_id[group_id].remove(w)
                 w.terminate_instance()
-        self.clean_task_queues()
 
         try:
             self.log.info("Rescheduling unfinished builds before stop")
diff --git a/backend/backend/daemons/dispatcher.py 
b/backend/backend/daemons/dispatcher.py
index 10375ba..ae3f460 100644
--- a/backend/backend/daemons/dispatcher.py
+++ b/backend/backend/daemons/dispatcher.py
@@ -7,8 +7,6 @@ import shutil
 import multiprocessing
 from setproctitle import setproctitle
 
-from retask.queue import Queue
-
 from ..vm_manage.manager import VmManager
 from ..exceptions import MockRemoteError, CoprWorkerError, VmError, 
NoVmAvailable
 from ..job import BuildJob
@@ -16,6 +14,7 @@ from ..mockremote import MockRemote
 from ..constants import BuildStatus, JOB_GRAB_TASK_END_PUBSUB, build_log_format
 from ..helpers import register_build_result, get_redis_connection, 
get_redis_logger, \
     local_file_logger
+from .. import jobgrabcontrol
 
 
 # ansible_playbook = "ansible-playbook"
@@ -32,8 +31,6 @@ class Worker(multiprocessing.Process):
     Worker process dispatches building tasks. Backend spin-up multiple 
workers, each
     worker associated to one group_id and process one task at the each moment.
 
-    Worker listens for the new tasks from :py:class:`retask.Queue` associated 
with its group_id
-
     :param Munch opts: backend config
     :param int worker_num: worker number
     :param int group_id: group_id from the set of groups defined in config
@@ -51,9 +48,7 @@ class Worker(multiprocessing.Process):
 
         self.log = get_redis_logger(self.opts, self.logger_name, "worker")
 
-        # job management stuff
-        self.task_queue = Queue("copr-be-{0}".format(str(group_id)))
-        self.task_queue.connect()
+        self.jg = jobgrabcontrol.Channel(self.opts, self.log)
         # event queue for communicating back to dispatcher
 
         self.kill_received = False
@@ -238,14 +233,16 @@ class Worker(multiprocessing.Process):
         # this sometimes caused TypeError in random worker
         # when another one  picekd up a task to build
         # why?
+        # praiskup: not reproduced
         try:
-            task = self.task_queue.dequeue()
-        except TypeError:
+            task = self.jg.get_build(self.group_id)
+        except TypeError as err:
+            self.log.warning(err)
             return
         if not task:
             return
 
-        job = BuildJob(task.data, self.opts)
+        job = BuildJob(task, self.opts)
         self.update_process_title(suffix="Task: {} chroot: {}, obtained at {}"
                                   .format(job.build_id, job.chroot, 
str(datetime.now())))
 
diff --git a/backend/backend/daemons/job_grab.py 
b/backend/backend/daemons/job_grab.py
index 21871ae..7bca75b 100644
--- a/backend/backend/daemons/job_grab.py
+++ b/backend/backend/daemons/job_grab.py
@@ -10,8 +10,6 @@ import time
 from setproctitle import setproctitle
 
 from requests import get, RequestException
-from retask.task import Task
-from retask.queue import Queue
 
 from backend.frontend import FrontendClient
 
@@ -19,11 +17,22 @@ from ..actions import Action
 from ..constants import JOB_GRAB_TASK_END_PUBSUB
 from ..helpers import get_redis_connection, get_redis_logger
 from ..exceptions import CoprJobGrabError
-
+from .. import jobgrabcontrol
 
 # TODO: Replace entire model with asynchronous queue, so that frontend push 
task,
 # and workers listen for them
-
+# praiskup: Please don't.  I doubt this would help too much, and I really don't
+# think it is worth another rewrite.  Reasons (imho):
+#   a. there still needs to be "one" organizator, aka jobgrabber on the backend
+#      VM side -- we do not want allow Workers to contact frontend directly
+#      because of (1) security and (2) process synchronization.
+#   b. in frontend, we _never_ want to block UI differently than on database,
+#      so the push to BE can't be done instantly -- and thus there would have
+#      to be something like buffered "JobPusher" (and that would be most
+#      probably implemented as poll anyway).  Maybe we could use some "pipe"
+#      approach through infinite (http?) connection, or opened database
+#      connection, .. but I don't think it does matter too much who will
+#      control the "pipe".
 class CoprJobGrab(object):
 
     """
@@ -36,41 +45,33 @@ class CoprJobGrab(object):
     :param Munch opts: backend config
     :param lock: :py:class:`multiprocessing.Lock` global backend lock
 
+    TODO: Not yet fully ready for config reload.
     """
 
     def __init__(self, opts):
         """ base class initialization """
 
         self.opts = opts
+
+        # Maps e.g. x86_64 && i386 => PC (.
         self.arch_to_group_id_map = dict()
-        for group in self.opts.build_groups:
-            for arch in group["archs"]:
-                self.arch_to_group_id_map[arch] = group["id"]
-
-        self.task_queues_by_arch = {}
-        self.task_queues_by_group = {}
-
-        self.added_jobs_dict = dict()  # task_id -> task dict
-
+        # PC => max N builders per user
+        self.group_to_usermax = dict()
+        # task_id -> task dict
+        self.added_jobs_dict = dict()
 
         self.rc = None
         self.channel = None
         self.ps_thread = None
 
         self.log = get_redis_logger(self.opts, "backend.job_grab", "job_grab")
+        self.jg_control = jobgrabcontrol.Channel(self.opts, self.log)
         self.frontend_client = FrontendClient(self.opts, self.log)
 
-    def connect_queues(self):
-        """
-        Connects to the retask queues. One queue per builders group.
-        """
-        for group in self.opts.build_groups:
-            queue = Queue("copr-be-{0}".format(group["id"]))
-            queue.connect()
 
-            self.task_queues_by_group[group["name"]] = queue
-            for arch in group["archs"]:
-                self.task_queues_by_arch[arch] = queue
+    def group(self, arch):
+        return self.arch_to_group_id_map[arch]
+
 
     def listen_to_pubsub(self):
         """
@@ -84,6 +85,7 @@ class CoprJobGrab(object):
 
         self.log.info("Subscribed to {} 
channel".format(JOB_GRAB_TASK_END_PUBSUB))
 
+
     def route_build_task(self, task):
         """
         Route build task to the appropriate queue.
@@ -101,24 +103,23 @@ class CoprJobGrab(object):
         if "task_id" in task:
             if task["task_id"] not in self.added_jobs_dict:
                 arch = task["chroot"].split("-")[2]
-                if arch not in self.task_queues_by_arch:
-                    raise CoprJobGrabError("No builder group for architecture: 
{}, task: {}"
-                                           .format(arch, task))
+                group = self.group(arch)
 
                 username = task["project_owner"]
-                group_id = int(self.arch_to_group_id_map[arch])
                 active_jobs_count = len([t for t_id, t in 
self.added_jobs_dict.items()
                                          if t["project_owner"] == username])
 
-                if active_jobs_count > 
self.opts.build_groups[group_id]["max_vm_per_user"]:
-                    self.log.debug("User can not acquire more VM (active 
builds #{}), "
+                if active_jobs_count > self.group_to_usermax[group]:
+                    self.log.debug("User can not acquire more VM (active 
builds #{0}), "
                                    "don't schedule more 
tasks".format(active_jobs_count))
                     return 0
 
+                msg = "enqueue task for user {0}: id={1}, arch={2}, group={3}, 
active={4}"
+                self.log.debug(msg.format(username, task["task_id"], arch, 
group, active_jobs_count))
+
+                # Add both to local list and control channel queue.
                 self.added_jobs_dict[task["task_id"]] = task
-
-                task_obj = Task(task)
-                self.task_queues_by_arch[arch].enqueue(task_obj)
+                self.jg_control.add_build(group, task)
                 count += 1
 
         else:
@@ -226,22 +227,46 @@ class CoprJobGrab(object):
             self.log.debug("Added jobs after remove and load: 
{}".format(self.added_jobs_dict))
             self.log.debug("# of executed jobs: 
{}".format(len(self.added_jobs_dict)))
 
-        for group, queue in self.task_queues_by_group.items():
-            if queue.length > 0:
-                self.log.debug("# of pending jobs for `{}`: {}".format(group, 
queue.length))
+
+    def init_internal_structures(self):
+        self.arch_to_group_id_map = dict()
+        self.group_to_usermax = dict()
+        for group in self.opts.build_groups:
+            group_id = group["id"]
+            for arch in group["archs"]:
+                self.arch_to_group_id_map[arch] = group_id
+                self.log.debug("mapping {0} to {1} group".format(arch, 
group_id))
+
+            self.log.debug("user might use only {0}VMs for {1} 
group".format(group["max_vm_per_user"], group_id))
+            self.group_to_usermax[group_id] = group["max_vm_per_user"]
+
+        self.added_jobs_dict = dict()
+
+
+    def handle_control_channel(self):
+        if not self.jg_control.backend_started():
+            return
+        self.log.info("backend gave us signal to start")
+        self.init_internal_structures()
+        self.jg_control.remove_all_builds()
+        self.jg_control.job_graber_initialized()
 
     def run(self):
         """
         Starts job grabber process
         """
         setproctitle("CoprJobGrab")
-        self.connect_queues()
         self.listen_to_pubsub()
 
         self.log.info("JobGrub started.")
+
+        self.init_internal_structures()
         try:
             while True:
                 try:
+                    # This effectively delays job_grabbing until backend
+                    # gives as signal to start.
+                    self.handle_control_channel()
                     self.load_tasks()
                     self.log_queue_info()
                     time.sleep(self.opts.sleeptime)
diff --git a/backend/backend/helpers.py b/backend/backend/helpers.py
index 3e9c028..fa1a3e5 100644
--- a/backend/backend/helpers.py
+++ b/backend/backend/helpers.py
@@ -11,6 +11,7 @@ import ConfigParser
 import os
 import sys
 import errno
+import time
 from contextlib import contextmanager
 
 import traceback
@@ -28,6 +29,18 @@ from backend.constants import DEF_BUILD_USER, 
DEF_BUILD_TIMEOUT, DEF_CONSECUTIVE
     CONSECUTIVE_FAILURE_REDIS_KEY, default_log_format
 from backend.exceptions import CoprBackendError
 
+
+def wait_log(log, reason="I don't know why.", timeout=5):
+    """
+    We need to wait a while, this should happen only when copr converges to
+    boot-up/restart/..
+    """
+    if not log:
+        return
+    log.warning("I'm waiting {0}s because: {1}".format(timeout, reason))
+    time.sleep(timeout)
+
+
 class SortedOptParser(optparse.OptionParser):
     """Optparser which sorts the options by opt before outputting --help"""
 
diff --git a/backend/backend/jobgrabcontrol.py 
b/backend/backend/jobgrabcontrol.py
new file mode 100644
index 0000000..c8d7414
--- /dev/null
+++ b/backend/backend/jobgrabcontrol.py
@@ -0,0 +1,80 @@
+from retask.queue import Queue
+from retask.task import Task
+
+from .helpers import wait_log
+
+class Channel(object):
+    """
+    Abstraction above retask (the set of "channels" between backend(s),
+    jobgrabber and workers).  We could use multiple backends and/or diffferent
+    "atomic" medium (other implemntation than Queue) in future.  But
+    make sure nobody needs to touch the "medium" directly.
+    """
+
+    def __init__(self, opts, log=None):
+        self.log = log
+        self.opts = opts
+        # channel for Backend <--> JobGrabber communication
+        self.jg_start = Queue("jg_control_start")
+        # channel for JobGrabber <--> [[Builders]] communication
+        self.build_queues = dict()
+        while not self.jg_start.connect():
+            wait_log("waiting for redis", 5)
+
+    def _get_queue(self, bgroup):
+        if not bgroup in self.build_queues:
+            q_id = "copr-be-{0}".format(bgroup)
+            q = Queue(q_id)
+            if not q.connect():
+                # As we already connected to jg_control_message, this should
+                # be also OK.
+                raise Exception("can't connect to redis, should never happen!")
+            return q
+
+        return self.build_queues[bgroup]
+
+    def add_build(self, bgroup, build):
+        """ this should be used by job_grab only for now """
+        q = self._get_queue(bgroup)
+        try:
+            q.enqueue(Task(build))
+        except Exception as err:
+            # I've seen isses Task() was not able to jsonify urllib exceptions
+            if not self.log:
+                return False
+            self.log.error("can't enqueue build {0}, reason:\n{1}".format(
+                build, err
+            ))
+
+        return True
+
+    # Builder's API
+    def get_build(self, bgroup):
+        """
+        Return task from queue or return 0
+        """
+        q = self._get_queue(bgroup)
+        t = q.dequeue()
+        return t.data if t else None
+
+    # JobGrab's API
+    def backend_started(self):
+        return self.jg_start.length
+
+    def job_graber_initialized(self):
+        while self.jg_start.dequeue():
+            pass
+
+    def remove_all_builds(self):
+        for bgroup in self.build_queues:
+            q = self._get_queue(bgroup)
+            while q.dequeue():
+                pass
+        self.build_queues = dict()
+
+    # Backend's API
+    def backend_start(self):
+        """ Notify jobgrab about service start. """
+        self.jg_start.enqueue("start")
+        while self.jg_start.length:
+            wait_log(self.log, "waiting until jobgrabber initializes queue")
-- 
2.5.0
_______________________________________________
copr-devel mailing list
[email protected]
https://lists.fedorahosted.org/admin/lists/[email protected]

Reply via email to