This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/one-cache-size-job-2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 627a0179a96ce45babee17e4388d34b95429be9c
Author: Tristan Van Berkom <[email protected]>
AuthorDate: Mon Jan 7 11:20:11 2019 -0500

    Scheduler: Introduced JobStatus instead of simple success boolean
    
    This changes the deepest callback from when a Job completes to
    propagate a JobStatus value instead of a simple boolean, and updates
    all of the effected code paths which used to receive a boolean
    to now handle the JobStatus values.
    
    This further improves the situation for issue #753, as now we avoid
    queueing cache size jobs for pull jobs which are skipped.
---
 buildstream/_frontend/app.py                |  6 ++---
 buildstream/_scheduler/__init__.py          |  2 +-
 buildstream/_scheduler/jobs/__init__.py     |  1 +
 buildstream/_scheduler/jobs/cachesizejob.py |  6 ++---
 buildstream/_scheduler/jobs/cleanupjob.py   |  6 ++---
 buildstream/_scheduler/jobs/elementjob.py   |  6 ++---
 buildstream/_scheduler/jobs/job.py          | 36 +++++++++++++++++++++++++----
 buildstream/_scheduler/queues/buildqueue.py |  6 ++---
 buildstream/_scheduler/queues/fetchqueue.py |  5 ++--
 buildstream/_scheduler/queues/pullqueue.py  |  8 ++++---
 buildstream/_scheduler/queues/queue.py      | 13 +++++------
 buildstream/_scheduler/queues/trackqueue.py |  5 ++--
 buildstream/_scheduler/scheduler.py         |  6 ++---
 13 files changed, 68 insertions(+), 38 deletions(-)

diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 063ca1e..af38ae9 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -38,7 +38,7 @@ from .._message import Message, MessageType, 
unconditional_messages
 from .._stream import Stream
 from .._versions import BST_FORMAT_VERSION
 from .. import _yaml
-from .._scheduler import ElementJob
+from .._scheduler import ElementJob, JobStatus
 
 # Import frontend assets
 from . import Profile, LogLine, Status
@@ -515,13 +515,13 @@ class App():
         self._status.add_job(job)
         self._maybe_render_status()
 
-    def _job_completed(self, job, success):
+    def _job_completed(self, job, status):
         self._status.remove_job(job)
         self._maybe_render_status()
 
         # Dont attempt to handle a failure if the user has already opted to
         # terminate
-        if not success and not self.stream.terminated:
+        if status == JobStatus.FAIL and not self.stream.terminated:
 
             if isinstance(job, ElementJob):
                 element = job.element
diff --git a/buildstream/_scheduler/__init__.py 
b/buildstream/_scheduler/__init__.py
index b6e3eeb..4708598 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
 from .queues.pullqueue import PullQueue
 
 from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob
+from .jobs import ElementJob, JobStatus
diff --git a/buildstream/_scheduler/jobs/__init__.py 
b/buildstream/_scheduler/jobs/__init__.py
index 4b0b11d..3e21317 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -20,3 +20,4 @@
 from .elementjob import ElementJob
 from .cachesizejob import CacheSizeJob
 from .cleanupjob import CleanupJob
+from .job import JobStatus
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py 
b/buildstream/_scheduler/jobs/cachesizejob.py
index d46fd4c..6e4698a 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
 #  Author:
 #        Tristan Daniël Maat <[email protected]>
 #
-from .job import Job
+from .job import Job, JobStatus
 
 
 class CacheSizeJob(Job):
@@ -30,8 +30,8 @@ class CacheSizeJob(Job):
     def child_process(self):
         return self._artifacts.compute_cache_size()
 
-    def parent_complete(self, success, result):
-        if success:
+    def parent_complete(self, status, result):
+        if status == JobStatus.OK:
             self._artifacts.set_cache_size(result)
 
             if self._complete_cb:
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py 
b/buildstream/_scheduler/jobs/cleanupjob.py
index 8bdbba0..e579e97 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
 #  Author:
 #        Tristan Daniël Maat <[email protected]>
 #
-from .job import Job
+from .job import Job, JobStatus
 
 
 class CleanupJob(Job):
@@ -29,6 +29,6 @@ class CleanupJob(Job):
     def child_process(self):
         return self._artifacts.clean()
 
-    def parent_complete(self, success, result):
-        if success:
+    def parent_complete(self, status, result):
+        if status == JobStatus.OK:
             self._artifacts.set_cache_size(result)
diff --git a/buildstream/_scheduler/jobs/elementjob.py 
b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c06..fa0d34f 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -60,7 +60,7 @@ from .job import Job
 #     Args:
 #        job (Job): The job object which completed
 #        element (Element): The element passed to the Job() constructor
-#        success (bool): True if the action_cb did not raise an exception
+#        status (JobStatus): The status of whether the workload raised an 
exception
 #        result (object): The deserialized object returned by the `action_cb`, 
or None
 #                         if `success` is False
 #
@@ -93,8 +93,8 @@ class ElementJob(Job):
         # Run the action
         return self._action_cb(self._element)
 
-    def parent_complete(self, success, result):
-        self._complete_cb(self, self._element, success, self._result)
+    def parent_complete(self, status, result):
+        self._complete_cb(self, self._element, status, self._result)
 
     def message(self, message_type, message, **kwargs):
         args = dict(kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py 
b/buildstream/_scheduler/jobs/job.py
index 3ab7bfa..2b08096 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -43,6 +43,22 @@ RC_PERM_FAIL = 2
 RC_SKIPPED = 3
 
 
+# JobStatus:
+#
+# The job completion status, passed back through the
+# complete callbacks.
+#
+class JobStatus():
+    # Job succeeded
+    OK = 0
+
+    # A temporary BstError was raised
+    FAIL = 1
+
+    # A SkipJob was raised
+    SKIPPED = 3
+
+
 # Used to distinguish between status messages and return values
 class Envelope():
     def __init__(self, message_type, message):
@@ -297,10 +313,10 @@ class Job():
     # pass the result to the main thread.
     #
     # Args:
-    #    success (bool): Whether the job was successful.
+    #    status (JobStatus): The job exit status
     #    result (any): The result returned by child_process().
     #
-    def parent_complete(self, success, result):
+    def parent_complete(self, status, result):
         raise ImplError("Job '{kind}' does not implement parent_complete()"
                         .format(kind=type(self).__name__))
 
@@ -571,9 +587,19 @@ class Job():
             self.spawn()
             return
 
-        success = returncode in (RC_OK, RC_SKIPPED)
-        self.parent_complete(success, self._result)
-        self._scheduler.job_completed(self, success)
+        # Resolve the outward facing overall job completion status
+        #
+        if returncode == RC_OK:
+            status = JobStatus.OK
+        elif returncode == RC_SKIPPED:
+            status = JobStatus.SKIPPED
+        elif returncode in (RC_FAIL, RC_PERM_FAIL):
+            status = JobStatus.FAIL
+        else:
+            status = JobStatus.FAIL
+
+        self.parent_complete(status, self._result)
+        self._scheduler.job_completed(self, status)
 
         # Force the deletion of the queue and process objects to try and clean 
up FDs
         self._queue = self._process = None
diff --git a/buildstream/_scheduler/queues/buildqueue.py 
b/buildstream/_scheduler/queues/buildqueue.py
index d053275..9d8e718 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,7 @@
 from datetime import timedelta
 
 from . import Queue, QueueStatus
-from ..jobs import ElementJob
+from ..jobs import ElementJob, JobStatus
 from ..resources import ResourceType
 from ..._message import MessageType
 
@@ -104,7 +104,7 @@ class BuildQueue(Queue):
         if artifacts.has_quota_exceeded():
             self._scheduler.check_cache_size()
 
-    def done(self, job, element, result, success):
+    def done(self, job, element, result, status):
 
         # Inform element in main process that assembly is done
         element._assemble_done()
@@ -117,5 +117,5 @@ class BuildQueue(Queue):
         #        artifact cache size for a successful build even though we 
know a
         #        failed build also grows the artifact cache size.
         #
-        if success:
+        if status == JobStatus.OK:
             self._check_cache_size(job, element, result)
diff --git a/buildstream/_scheduler/queues/fetchqueue.py 
b/buildstream/_scheduler/queues/fetchqueue.py
index c58bfdb..fc11fd1 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -24,6 +24,7 @@ from ... import Consistency
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
+from ..jobs import JobStatus
 
 
 # A queue which fetches element sources
@@ -66,9 +67,9 @@ class FetchQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, _, element, result, success):
+    def done(self, _, element, result, status):
 
-        if not success:
+        if status == JobStatus.FAIL:
             return
 
         element._update_state()
diff --git a/buildstream/_scheduler/queues/pullqueue.py 
b/buildstream/_scheduler/queues/pullqueue.py
index b861373..dbeb806 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
+from ..jobs import JobStatus
 from ..._exceptions import SkipJob
 
 
@@ -54,9 +55,9 @@ class PullQueue(Queue):
         else:
             return QueueStatus.SKIP
 
-    def done(self, _, element, result, success):
+    def done(self, _, element, result, status):
 
-        if not success:
+        if status == JobStatus.FAIL:
             return
 
         element._pull_done()
@@ -64,4 +65,5 @@ class PullQueue(Queue):
         # Build jobs will check the "approximate" size first. Since we
         # do not get an artifact size from pull jobs, we have to
         # actually check the cache size.
-        self._scheduler.check_cache_size()
+        if status == JobStatus.OK:
+            self._scheduler.check_cache_size()
diff --git a/buildstream/_scheduler/queues/queue.py 
b/buildstream/_scheduler/queues/queue.py
index 055e2f8..707fcf5 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ from enum import Enum
 import traceback
 
 # Local imports
-from ..jobs import ElementJob
+from ..jobs import ElementJob, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
@@ -133,10 +133,9 @@ class Queue():
     #    job (Job): The job which completed processing
     #    element (Element): The element which completed processing
     #    result (any): The return value of the process() implementation
-    #    success (bool): True if the process() implementation did not
-    #                    raise any exception
+    #    status (JobStatus): The return status of the Job
     #
-    def done(self, job, element, result, success):
+    def done(self, job, element, result, status):
         pass
 
     #####################################################
@@ -291,7 +290,7 @@ class Queue():
     #
     # See the Job object for an explanation of the call signature
     #
-    def _job_done(self, job, element, success, result):
+    def _job_done(self, job, element, status, result):
 
         # Update values that need to be synchronized in the main task
         # before calling any queue implementation
@@ -301,7 +300,7 @@ class Queue():
         # and determine if it should be considered as processed
         # or skipped.
         try:
-            self.done(job, element, result, success)
+            self.done(job, element, result, status)
         except BstError as e:
 
             # Report error and mark as failed
@@ -337,7 +336,7 @@ class Queue():
             # if they are not skipped.
             if job.skipped:
                 self.skipped_elements.append(element)
-            elif success:
+            elif status == JobStatus.OK:
                 self.processed_elements.append(element)
             else:
                 self.failed_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py 
b/buildstream/_scheduler/queues/trackqueue.py
index 5144180..245b528 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
+from ..jobs import JobStatus
 
 
 # A queue which tracks sources
@@ -47,9 +48,9 @@ class TrackQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, _, element, result, success):
+    def done(self, _, element, result, status):
 
-        if not success:
+        if status == JobStatus.FAIL:
             return
 
         # Set the new refs in the main process one by one as they complete
diff --git a/buildstream/_scheduler/scheduler.py 
b/buildstream/_scheduler/scheduler.py
index ecbfef3..97335de 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -237,14 +237,14 @@ class Scheduler():
     # Args:
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
+    #    status (JobStatus): The status of the completed job
     #
-    def job_completed(self, job, success):
+    def job_completed(self, job, status):
         self._resources.clear_job_resources(job)
         self.active_jobs.remove(job)
         if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
             self._exclusive_active.remove(job.action_name)
-        self._job_complete_callback(job, success)
+        self._job_complete_callback(job, status)
         self._schedule_queue_jobs()
         self._sched()
 

Reply via email to