Repository: aurora
Updated Branches:
  refs/heads/master aae39a81e -> 13055df3b


Ensure Thermos is not overloaded by an unlimited number of lost processes

Included changes:

* Thermos may consider launched processes to be LOST. Instead of
  restarting those immediately, the restarts will now be at least
  `min_duration` seconds apart. Restarts will also be capped at the
  TOTAL_RUN_LIMIT of 100 restarts. This ensures neither Thermos nor the
  observer will be overloaded by checkpoints. The handling of the LOST
  state is now consistent with the handling of both FAILED and FINISHED.
* Mark the success_transition and failure_transition as private. They
  are only used within `TaskPlanner` itself.
* Fix documented default of `min_duration` (i.e 5s rather than 15s).

Reviewed at https://reviews.apache.org/r/60306/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/13055df3
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/13055df3
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/13055df3

Branch: refs/heads/master
Commit: 13055df3b93031fba859f61e5185611bf1ce5f4c
Parents: aae39a8
Author: Stephan Erb <s...@apache.org>
Authored: Thu Jun 22 00:49:40 2017 +0200
Committer: Stephan Erb <s...@apache.org>
Committed: Thu Jun 22 00:49:40 2017 +0200

----------------------------------------------------------------------
 docs/reference/configuration.md                 |  2 +-
 .../python/apache/thermos/common/planner.py     | 40 ++++++++++++++------
 .../apache/thermos/common/test_task_planner.py  | 14 +++++--
 3 files changed, 39 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/13055df3/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index 6a9a3ff..bc7e098 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -44,7 +44,7 @@ behavior with its optional attributes. Remember, Processes 
are handled by Thermo
    **max_failures**   | Integer     | Maximum process failures (Default: 1)
    **daemon**         | Boolean     | When True, this is a daemon process. 
(Default: False)
    **ephemeral**      | Boolean     | When True, this is an ephemeral process. 
(Default: False)
-   **min_duration**   | Integer     | Minimum duration between process 
restarts in seconds. (Default: 15)
+   **min_duration**   | Integer     | Minimum duration between process 
restarts in seconds. (Default: 5)
    **final**          | Boolean     | When True, this process is a finalizing 
one that should run last. (Default: False)
    **logger**         | Logger      | Struct defining the log behavior for the 
process. (Default: Empty)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/13055df3/src/main/python/apache/thermos/common/planner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/planner.py 
b/src/main/python/apache/thermos/common/planner.py
index da5120f..5c23d86 100644
--- a/src/main/python/apache/thermos/common/planner.py
+++ b/src/main/python/apache/thermos/common/planner.py
@@ -199,6 +199,7 @@ class TaskPlanner(object):
     self._last_terminal = {}  # process => timestamp of last terminal state
     self._failures = defaultdict(int)
     self._successes = defaultdict(int)
+    self._losses = defaultdict(int)
     self._attributes = {}
     self._ephemerals = set(process.name().get() for process in task.processes()
         if (self._filter is None or self._filter(process)) and 
process.ephemeral().get())
@@ -216,6 +217,10 @@ class TaskPlanner(object):
       return 0
     return self._attributes[process].min_duration - (now - 
self._last_terminal[process])
 
+  def _record_termination_time(self, process, timestamp=None):
+    timestamp = timestamp if timestamp is not None else self._clock.time()
+    self._last_terminal[process] = timestamp
+
   def is_ready(self, process, timestamp=None):
     return self.get_wait(process, timestamp) <= 0
 
@@ -253,15 +258,15 @@ class TaskPlanner(object):
     """Increment the failure count of a process, and reset it to runnable if 
maximum number of
     failures has not been reached, or mark it as failed otherwise (ephemeral 
processes do not
     count towards the success of a task, and are hence marked finished 
instead)"""
-    timestamp = timestamp if timestamp is not None else self._clock.time()
-    self._last_terminal[process] = timestamp
+    self._record_termination_time(process, timestamp)
     self._failures[process] += 1
-    self.failure_transition(process)
+    self._failure_transition(process)
 
   def has_reached_run_limit(self, process):
-    return (self._successes[process] + self._failures[process]) >= 
self.TOTAL_RUN_LIMIT
+    runs = self._successes[process] + self._failures[process] + 
self._losses[process]
+    return runs >= self.TOTAL_RUN_LIMIT
 
-  def failure_transition(self, process):
+  def _failure_transition(self, process):
     if self.has_reached_run_limit(process):
       self._planner.set_failed(process)
       return
@@ -276,12 +281,11 @@ class TaskPlanner(object):
 
   def add_success(self, process, timestamp=None):
     """Reset a process to runnable if it is a daemon, or mark it as finished 
otherwise."""
-    timestamp = timestamp if timestamp is not None else self._clock.time()
-    self._last_terminal[process] = timestamp
+    self._record_termination_time(process, timestamp)
     self._successes[process] += 1
-    self.success_transition(process)
+    self._success_transition(process)
 
-  def success_transition(self, process):
+  def _success_transition(self, process):
     if self.has_reached_run_limit(process):
       self._planner.set_failed(process)
       return
@@ -295,9 +299,21 @@ class TaskPlanner(object):
     """Force a process to be in failed state.  E.g. kill -9 and you want it 
pinned failed."""
     self._planner.set_failed(process)
 
-  def lost(self, process):
-    """Mark a process as lost.  This sets its runnable state back to the 
previous runnable
-       state and does not increment its failure count."""
+  def lost(self, process, timestamp=None):
+    """Mark a process as lost. This sets its runnable state back to the 
previous runnable
+       state and does not increment the failure count of the process.
+
+       In order to prevent Thermos from overloading itself, even restarts of 
lost processes will be
+       throttled by the `min_duration` uptime and capped at TOTAL_RUN_LIMIT."""
+    self._record_termination_time(process, timestamp)
+    self._losses[process] += 1
+    self._lost_transition(process)
+
+  def _lost_transition(self, process):
+    if self.has_reached_run_limit(process):
+      self._planner.set_failed(process)
+      return
+
     self._planner.reset(process)
 
   def is_complete(self):

http://git-wip-us.apache.org/repos/asf/aurora/blob/13055df3/src/test/python/apache/thermos/common/test_task_planner.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/common/test_task_planner.py 
b/src/test/python/apache/thermos/common/test_task_planner.py
index 132c1ec..b56ef84 100644
--- a/src/test/python/apache/thermos/common/test_task_planner.py
+++ b/src/test/python/apache/thermos/common/test_task_planner.py
@@ -287,8 +287,8 @@ def test_task_lost():
   p.add_failure('d1', timestamp=1)
   assert approx_equal(p.min_wait(timestamp=1), 1)
   p.set_running('d1')
-  p.lost('d1')
-  assert approx_equal(p.min_wait(timestamp=1), 1)
+  p.lost('d1', timestamp=2)
+  assert approx_equal(p.min_wait(timestamp=2), 1)
   p.set_running('d1')
   p.add_failure('d1', timestamp=3)
   assert p.min_wait(timestamp=3) == TaskPlanner.INFINITY
@@ -316,7 +316,7 @@ def test_task_filters():
 
 def test_task_max_runs():
   class CappedTaskPlanner(TaskPlanner):
-    TOTAL_RUN_LIMIT = 2
+    TOTAL_RUN_LIMIT = 3
   dt = p1(daemon=True, max_failures=0)
 
   p = CappedTaskPlanner(empty_task(processes=[dt(name='d1', max_failures=100, 
daemon=False)]))
@@ -324,7 +324,10 @@ def test_task_max_runs():
   p.add_failure('d1', timestamp=1)
   assert 'd1' in p.runnable
   p.set_running('d1')
-  p.add_failure('d1', timestamp=2)
+  p.lost('d1', timestamp=2)
+  assert 'd1' in p.runnable
+  p.set_running('d1')
+  p.add_failure('d1', timestamp=3)
   assert 'd1' not in p.runnable
 
   p = CappedTaskPlanner(empty_task(processes=[dt(name='d1', 
max_failures=100)]))
@@ -333,4 +336,7 @@ def test_task_max_runs():
   assert 'd1' in p.runnable
   p.set_running('d1')
   p.add_success('d1', timestamp=2)
+  assert 'd1' in p.runnable
+  p.set_running('d1')
+  p.lost('d1', timestamp=3)
   assert 'd1' not in p.runnable

Reply via email to