Repository: aurora
Updated Branches:
  refs/heads/master 03eb33799 -> a3f8aef6b


Introduce mesos disk collector

When disk isolation is enabled in a Mesos agent it calculates the disk usage 
for each container.
Thermos Observer also monitors disk usage using `twitter.common.dirutil`, 
essentially repeating the work already done by the agent. In practice, we see 
that disk monitoring is one of the most expensive resource monitoring tasks. 
For instance, when there are deeply nested directories, the CPU utilization of 
the observer process can easily reach 1.5 CPUs. It would be ideal if we 
delegate the disk monitoring task to the agent and do it only once. With this 
approach, when disk collection has improved in the agent (for instance by 
implementing XFS isolation), we can simply benefit from it without any code 
change. Some more information about the problem is provided in AURORA-1918.

This patch that introduces `MesosDiskCollector` which queries the agent's API 
endpoint to lookup disk_used_bytes. Note that there is also resource monitoring 
in thermos executor. Currently, I left the disk collector there to use the `du` 
implementation. That can be changed in a later patch.

I modified some vagrant config files including `aurora-executor.service` and 
`etc_mesos-slave/isolation` for testing. They can be left as is. I included 
them in this patch to show how this would work e2e.

Testing Done:
- I added unit tests.
- Tested in vagrant and it works as intenced.
- I also built and deployed in our test enviroment. In order to measure 
imporoved performance I created jobs with nested folders and noticed reduction 
in CPU utilization of the Observer process, by at least 60%. (1.5 CPU cores to 
0.4 CPU cores)

Here is one specific test setup: On two hosts I created a two tasks. Each task 
creates identical nested directory structures and files in them. The overall 
size is 30GB. test_host_1 runs the current version of observer and test_host_2 
runs Observer with this patch and also has mesos_disk_collection enabled. The 
results are as follows:

```
rezam[7]TEST_HOST_1 ~ $ while true; do echo `date`; curl localhost:1338/vars -s 
| grep cpu; sleep 10; done
Thu Mar 22 04:36:17 UTC 2018
observer.observer_cpu 108.9
Thu Mar 22 04:36:27 UTC 2018
observer.observer_cpu 123.2
Thu Mar 22 04:36:38 UTC 2018
observer.observer_cpu 123.2
Thu Mar 22 04:36:48 UTC 2018
observer.observer_cpu 123.2
Thu Mar 22 04:36:58 UTC 2018
observer.observer_cpu 111.0
Thu Mar 22 04:37:08 UTC 2018
observer.observer_cpu 111.0
Thu Mar 22 04:37:18 UTC 2018
observer.observer_cpu 111.0

rezam[7]TEST_HOST_2 ~ $ while true; do echo `date`; curl localhost:1338/vars -s 
| grep cpu; sleep 10; done
Thu Mar 22 04:36:20 UTC 2018
observer.observer_cpu 1.3
Thu Mar 22 04:36:30 UTC 2018
observer.observer_cpu 1.3
Thu Mar 22 04:36:40 UTC 2018
observer.observer_cpu 1.3
Thu Mar 22 04:36:50 UTC 2018
observer.observer_cpu 1.2
Thu Mar 22 04:37:00 UTC 2018
observer.observer_cpu 1.2
Thu Mar 22 04:37:10 UTC 2018
observer.observer_cpu 1.2
Thu Mar 22 04:37:20 UTC 2018
observer.observer_cpu 1.8
```

Reviewed at https://reviews.apache.org/r/66103/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a3f8aef6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a3f8aef6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a3f8aef6

Branch: refs/heads/master
Commit: a3f8aef6b4dcc13c79d60fc1ce02bcfcdb6e097d
Parents: 03eb337
Author: Reza Motamedi <[email protected]>
Authored: Mon Mar 26 13:47:13 2018 -0700
Committer: Santhosh Kumar <[email protected]>
Committed: Mon Mar 26 13:47:13 2018 -0700

----------------------------------------------------------------------
 3rdparty/python/requirements.txt                |   2 +
 RELEASE-NOTES.md                                |   5 +
 docs/reference/observer-configuration.md        |  14 +
 examples/jobs/hello_world.aurora                |   2 +-
 .../mesos_config/etc_mesos-slave/isolation      |   2 +-
 examples/vagrant/systemd/thermos.service        |   2 +
 .../apache/aurora/tools/thermos_observer.py     |  67 +++-
 src/main/python/apache/thermos/monitoring/BUILD |   1 +
 .../python/apache/thermos/monitoring/disk.py    | 176 ++++++++--
 .../apache/thermos/monitoring/resource.py       |  43 ++-
 .../apache/thermos/observer/task_observer.py    |  30 +-
 .../common/test_resource_manager_integration.py |   8 +-
 src/test/python/apache/thermos/monitoring/BUILD |   6 +
 .../apache/thermos/monitoring/test_disk.py      | 348 ++++++++++++++++++-
 .../apache/thermos/monitoring/test_resource.py  |  12 +
 15 files changed, 643 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/3rdparty/python/requirements.txt
----------------------------------------------------------------------
diff --git a/3rdparty/python/requirements.txt b/3rdparty/python/requirements.txt
index 4ac242c..c57f5fc 100644
--- a/3rdparty/python/requirements.txt
+++ b/3rdparty/python/requirements.txt
@@ -19,6 +19,8 @@ chardet==3.0.4
 CherryPy==8.0.0
 enum34==1.1.6
 idna==2.6
+httpretty==0.8.14
+jmespath==0.7.1
 mako==1.0.4
 mock==1.0.1
 mox==0.5.3

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 51ab6c7..8e43760 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -6,6 +6,11 @@
   `JobConfiguration.TaskConfig.ExecutorConfig`. This allows for using custom 
executors defined
   through the `--custom_executor_config` scheduler flag. See our
   [custom-executors](docs/features/custom-executors.md) documentation for more 
information.
+- Added support in Thermos Observer for delegating disk usage monitoring to 
Mesos agent. The feature
+  can be enabled via `--enable_mesos_disk_collector` flag, in which case 
Observer will use the
+  agent's containers HTTP API to query the amount of used bytes for each 
container. Note that disk
+  isolation should be enabled in Mesos agent. This feature is not compatible 
with authentication
+  enabled agents.
 
 0.20.0
 ===================

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/docs/reference/observer-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/observer-configuration.md 
b/docs/reference/observer-configuration.md
index 8a443c9..c791b34 100644
--- a/docs/reference/observer-configuration.md
+++ b/docs/reference/observer-configuration.md
@@ -29,6 +29,20 @@ Options:
   --task_disk_collection_interval_secs=TASK_DISK_COLLECTION_INTERVAL_SECS
                         The number of seconds between per task disk resource
                         collections. [default: 60]
+  --enable_mesos_disk_collector
+                        Delegate per task disk usage collection to agent.
+                        Should be enabled in conjunction to disk isolation in
+                        Mesos-agent. This is not compatible with an
+                        authenticated agent API. [default: False]
+  --agent_api_url=AGENT_API_URL
+                        Mesos Agent API url. [default:
+                        http://localhost:5051/containers]
+  --executor_id_json_path=EXECUTOR_ID_JSON_PATH
+                        `jmespath` to executor_id key in agent response json
+                        object. [default: executor_id]
+  --disk_usage_json_path=DISK_USAGE_JSON_PATH
+                        `jmespath` to disk usage bytes value in agent response
+                        json object. [default: statistics.disk_used_bytes]
 
   From module twitter.common.app:
     --app_daemonize     Daemonize this application. [default: False]

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/examples/jobs/hello_world.aurora
----------------------------------------------------------------------
diff --git a/examples/jobs/hello_world.aurora b/examples/jobs/hello_world.aurora
index 5401bfe..abd01ab 100644
--- a/examples/jobs/hello_world.aurora
+++ b/examples/jobs/hello_world.aurora
@@ -26,4 +26,4 @@ task = SequentialTask(
   resources = Resources(cpu = 1.0, ram = 128*MB, disk = 128*MB))
 
 jobs = [Service(
-  task = task, cluster = 'devcluster', role = 'www-data', environment = 
'prod', name = 'hello')]
+  task = task, cluster = 'devcluster', role = 'www-data', environment = 
'prod', name = 'hello')]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/examples/vagrant/mesos_config/etc_mesos-slave/isolation
----------------------------------------------------------------------
diff --git a/examples/vagrant/mesos_config/etc_mesos-slave/isolation 
b/examples/vagrant/mesos_config/etc_mesos-slave/isolation
index 1a7028f..556f040 100644
--- a/examples/vagrant/mesos_config/etc_mesos-slave/isolation
+++ b/examples/vagrant/mesos_config/etc_mesos-slave/isolation
@@ -1 +1 @@
-filesystem/linux,docker/runtime
+filesystem/linux,disk/du,docker/runtime

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/examples/vagrant/systemd/thermos.service
----------------------------------------------------------------------
diff --git a/examples/vagrant/systemd/thermos.service 
b/examples/vagrant/systemd/thermos.service
index 01925bc..a43edaa 100644
--- a/examples/vagrant/systemd/thermos.service
+++ b/examples/vagrant/systemd/thermos.service
@@ -22,6 +22,8 @@ SyslogIdentifier=thermos-observer
 ExecStart=/home/vagrant/aurora/dist/thermos_observer.pex \
   --ip=192.168.33.7 \
   --port=1338 \
+  --enable_mesos_disk_collector \
+  --agent_api_url=http://192.168.33.7:5051/containers \
   --log_to_disk=NONE \
   --log_to_stderr=google:INFO
 User=root

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/main/python/apache/aurora/tools/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py 
b/src/main/python/apache/aurora/tools/thermos_observer.py
index dd9f0c4..fd9465d 100644
--- a/src/main/python/apache/aurora/tools/thermos_observer.py
+++ b/src/main/python/apache/aurora/tools/thermos_observer.py
@@ -23,6 +23,7 @@ from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.path_detector import MesosPathDetector
 from apache.thermos.common.excepthook import ExceptionTerminationHandler
+from apache.thermos.monitoring.disk import DiskCollectorSettings
 from apache.thermos.monitoring.resource import TaskResourceMonitor
 from apache.thermos.observer.http.configure import configure_server
 from apache.thermos.observer.task_observer import TaskObserver
@@ -53,26 +54,59 @@ app.add_option(
 
 app.add_option(
     '--polling_interval_secs',
-      dest='polling_interval_secs',
-      type='int',
-      default=int(TaskObserver.POLLING_INTERVAL.as_(Time.SECONDS)),
-      help='The number of seconds between observer refresh attempts.')
+    dest='polling_interval_secs',
+    type='int',
+    default=int(TaskObserver.POLLING_INTERVAL.as_(Time.SECONDS)),
+    help='The number of seconds between observer refresh attempts.')
 
 
 app.add_option(
     '--task_process_collection_interval_secs',
-      dest='task_process_collection_interval_secs',
-      type='int',
-      
default=int(TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL.as_(Time.SECONDS)),
-      help='The number of seconds between per task process resource 
collections.')
+    dest='task_process_collection_interval_secs',
+    type='int',
+    
default=int(TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+    help='The number of seconds between per task process resource 
collections.')
 
 
 app.add_option(
     '--task_disk_collection_interval_secs',
-      dest='task_disk_collection_interval_secs',
-      type='int',
-      
default=int(TaskResourceMonitor.DISK_COLLECTION_INTERVAL.as_(Time.SECONDS)),
-      help='The number of seconds between per task disk resource collections.')
+    dest='task_disk_collection_interval_secs',
+    type='int',
+    
default=int(DiskCollectorSettings.DISK_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+    help='The number of seconds between per task disk resource collections.')
+
+
+app.add_option(
+    '--enable_mesos_disk_collector',
+    dest='enable_mesos_disk_collector',
+    default=False,
+    action='store_true',
+    help="Delegate per task disk usage collection to agent. Should be enabled 
in conjunction to "
+         "disk isolation in Mesos-agent. This is not compatible with an 
authenticated agent API.")
+
+
+app.add_option(
+    '--agent_api_url',
+    dest='agent_api_url',
+    type='string',
+    default=DiskCollectorSettings.DEFAULT_AGENT_CONTAINERS_ENDPOINT,
+    help='Mesos Agent API url.')
+
+
+app.add_option(
+    '--executor_id_json_path',
+    dest='executor_id_json_path',
+    type='string',
+    default=DiskCollectorSettings.DEFAULT_EXECUTOR_ID_PATH,
+    help='`jmespath` to executor_id key in agent response json object.')
+
+
+app.add_option(
+    '--disk_usage_json_path',
+    dest='disk_usage_json_path',
+    type='string',
+    default=DiskCollectorSettings.DEFAULT_DISK_USAGE_PATH,
+    help='`jmespath` to disk usage bytes value in agent response json object.')
 
 
 # Allow an interruptible sleep so that ^C works.
@@ -83,11 +117,18 @@ def sleep_forever():
 
 def initialize(options):
   path_detector = MesosPathDetector(options.mesos_root)
+  disk_collector_settings = DiskCollectorSettings(
+      options.agent_api_url,
+      options.executor_id_json_path,
+      options.disk_usage_json_path,
+      Amount(options.task_disk_collection_interval_secs, Time.SECONDS))
+
   return TaskObserver(
       path_detector,
       Amount(options.polling_interval_secs, Time.SECONDS),
       Amount(options.task_process_collection_interval_secs, Time.SECONDS),
-      Amount(options.task_disk_collection_interval_secs, Time.SECONDS))
+      enable_mesos_disk_collector=options.enable_mesos_disk_collector,
+      disk_collector_settings=disk_collector_settings)
 
 
 def main(_, options):

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/main/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/BUILD 
b/src/main/python/apache/thermos/monitoring/BUILD
index 65ba708..b3422cd 100644
--- a/src/main/python/apache/thermos/monitoring/BUILD
+++ b/src/main/python/apache/thermos/monitoring/BUILD
@@ -19,6 +19,7 @@ python_library(
   name = 'monitoring',
   sources = rglobs('*.py'),
   dependencies = [
+    '3rdparty/python:jmespath',
     '3rdparty/python:psutil',
     '3rdparty/python:twitter.common.collections',
     '3rdparty/python:twitter.common.concurrent',

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/main/python/apache/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/disk.py 
b/src/main/python/apache/thermos/monitoring/disk.py
index 986d33a..b06ed07 100644
--- a/src/main/python/apache/thermos/monitoring/disk.py
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -21,64 +21,182 @@ under a particular path.
 import threading
 import time
 
+import requests
+from jmespath import compile
 from twitter.common import log
 from twitter.common.dirutil import du
 from twitter.common.exceptions import ExceptionalThread
-from twitter.common.lang import Lockable
+from twitter.common.lang import AbstractClass, Lockable
+from twitter.common.quantity import Amount, Time
 
 
-class DiskCollectorThread(ExceptionalThread):
+class AbstractDiskCollector(Lockable, AbstractClass):
+  def __init__(self, root, settings=None):
+    self._settings = settings
+    self._root = root
+    self._thread = None
+    self._value = 0
+    super(AbstractDiskCollector, self).__init__()
+
+  @property
+  @Lockable.sync
+  def value(self):
+    """ Retrieve value of disk usage """
+    if self._thread is not None and self._thread.finished():
+      self._value = self._thread.value
+      self._thread = None
+    return self._value
+
+  @property
+  @Lockable.sync
+  def completed_event(self):
+    """ Return a threading.Event that will block until an in-progress disk 
collection is complete,
+    or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) 
"""
+    if self._thread is not None:
+      return self._thread.event
+    else:
+      return threading.Event()
+
+
+class DuDiskCollectorThread(ExceptionalThread):
   """ Thread to calculate aggregate disk usage under a given path using a 
simple algorithm """
 
   def __init__(self, path):
-    self.path = path
     self.value = None
     self.event = threading.Event()
-    super(DiskCollectorThread, self).__init__()
+    self._path = path
+    super(DuDiskCollectorThread, self).__init__()
     self.daemon = True
 
   def run(self):
     start = time.time()
-    self.value = du(self.path)
-    log.debug("DiskCollectorThread: finished collection of %s in %.1fms",
-        self.path, 1000.0 * (time.time() - start))
+    self.value = du(self._path)
+    log.debug("DuDiskCollectorThread: finished collection of %s in %.1fms",
+              self._path, 1000.0 * (time.time() - start))
     self.event.set()
 
   def finished(self):
     return self.event.is_set()
 
 
-class DiskCollector(Lockable):
+class DuDiskCollector(AbstractDiskCollector):
   """ Spawn a background thread to sample disk usage """
 
-  def __init__(self, root):
-    self._root = root
-    self._thread = None
-    self._value = 0
-    super(DiskCollector, self).__init__()
+  @Lockable.sync
+  def sample(self):
+    """ Trigger collection of sample, if not already begun """
+    if self._thread is None:
+      self._thread = DuDiskCollectorThread(self._root)
+      self._thread.start()
+
+
+class MesosDiskCollectorClient(ExceptionalThread):
+  """ Thread to lookup disk usage under a given path from Mesos agent """
+
+  DEFAULT_ERROR_VALUE = -1  # -1B
+
+  def __init__(self, path, settings):
+    self.value = None
+    self.event = threading.Event()
+    self._url = settings.http_api_url
+    self._request_timeout = settings.disk_collection_timeout.as_(Time.SECONDS)
+    self._path = path
+    self._executor_key_expression = settings.executor_id_json_expression
+    self._disk_usage_value_expression = settings.disk_usage_json_expression
+    super(MesosDiskCollectorClient, self).__init__()
+    self.daemon = True
+
+  def run(self):
+    start = time.time()
+    response = self._request_agent_containers()
+    filtered_container_stats = [
+      container
+      for container in response
+      if str(self._executor_key_expression.search(container)) in self._path]
+
+    if len(filtered_container_stats) != 1:
+      self.value = self.DEFAULT_ERROR_VALUE
+      log.warn("MesosDiskCollector: Didn't find container stats for path %s in 
agent metrics.",
+               self._path)
+    else:
+      self.value = 
self._disk_usage_value_expression.search(filtered_container_stats[0])
+      if self.value is None:
+        self.value = self.DEFAULT_ERROR_VALUE
+        log.warn("MesosDiskCollector: Didn't find disk usage stats for path %s 
in agent metrics.",
+                 self._path)
+      else:
+        log.debug("MesosDiskCollector: finished collection of %s in %.1fms",
+                  self._path, 1000.0 * (time.time() - start))
+
+    self.event.set()
+
+  def _request_agent_containers(self):
+    try:
+      resp = requests.get(self._url, timeout=self._request_timeout)
+      resp.raise_for_status()
+      return resp.json()
+    except requests.exceptions.RequestException as ex:
+      log.warn("MesosDiskCollector: Unexpected error talking to agent api: 
%s", ex)
+      return []
+
+  def finished(self):
+    return self.event.is_set()
+
+
+class MesosDiskCollector(AbstractDiskCollector):
+  """ Spawn a background thread to lookup disk usage under a path using from 
Mesos agent """
 
   @Lockable.sync
   def sample(self):
     """ Trigger collection of sample, if not already begun """
     if self._thread is None:
-      self._thread = DiskCollectorThread(self._root)
+      self._thread = MesosDiskCollectorClient(self._root, self._settings)
       self._thread.start()
 
+
+class DiskCollectorSettings(object):
+  """ Data container class to store Mesos agent api settings needed to retrive 
disk usages """
+
+  DEFAULT_AGENT_CONTAINERS_ENDPOINT = "http://localhost:5051/containers";
+  # Different versions of Mesos agent format their respons differntly. We use 
a json path library to
+  # allow custom navigate through the json response object.
+  # For documentaions see: http://jmespath.org/tutorial.html
+  DEFAULT_EXECUTOR_ID_PATH = "executor_id"
+  DEFAULT_DISK_USAGE_PATH = "statistics.disk_used_bytes"
+  DEFAULT_DISK_COLLECTION_TIMEOUT = Amount(5, Time.SECONDS)
+  DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS)
+
+  def __init__(
+      self,
+      http_api_url=DEFAULT_AGENT_CONTAINERS_ENDPOINT,
+      executor_id_json_path=DEFAULT_EXECUTOR_ID_PATH,
+      disk_usage_json_path=DEFAULT_DISK_USAGE_PATH,
+      disk_collection_timeout=DEFAULT_DISK_COLLECTION_TIMEOUT,
+      disk_collection_interval=DISK_COLLECTION_INTERVAL):
+
+    self._http_api_url = http_api_url
+    # We compile the JMESpath here for speed and also to detect bad JMESPaths 
immediately
+    self._executor_id_json_expression = compile(executor_id_json_path)
+    self._disk_usage_json_expression = compile(disk_usage_json_path)
+    self._disk_collection_interval = disk_collection_interval
+    self._disk_collection_timeout = disk_collection_timeout
+
   @property
-  @Lockable.sync
-  def value(self):
-    """ Retrieve value of disk usage """
-    if self._thread is not None and self._thread.finished():
-      self._value = self._thread.value
-      self._thread = None
-    return self._value
+  def http_api_url(self):
+    return self._http_api_url
 
   @property
-  @Lockable.sync
-  def completed_event(self):
-    """ Return a threading.Event that will block until an in-progress disk 
collection is complete,
-    or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) 
"""
-    if self._thread is not None:
-      return self._thread.event
-    else:
-      return threading.Event()
+  def executor_id_json_expression(self):
+    return self._executor_id_json_expression
+
+  @property
+  def disk_usage_json_expression(self):
+    return self._disk_usage_json_expression
+
+  @property
+  def disk_collection_interval(self):
+    return self._disk_collection_interval
+
+  @property
+  def disk_collection_timeout(self):
+    return self._disk_collection_timeout

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py 
b/src/main/python/apache/thermos/monitoring/resource.py
index adcdc75..72ed4e5 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -41,7 +41,7 @@ from twitter.common.exceptions import ExceptionalThread
 from twitter.common.lang import Interface
 from twitter.common.quantity import Amount, Time
 
-from .disk import DiskCollector
+from .disk import DiskCollectorSettings, DuDiskCollector, MesosDiskCollector
 from .process import ProcessSample
 from .process_collector_psutil import ProcessTreeCollector
 
@@ -141,6 +141,23 @@ class HistoryProvider(object):
     return ResourceHistory(history_length)
 
 
+class DiskCollectorProvider(object):
+  DEFAULT_DISK_COLLECTOR_CLASS = DuDiskCollector
+
+  def __init__(
+      self,
+      enable_mesos_disk_collector=False,
+      settings=DiskCollectorSettings()):
+
+    self.settings = settings
+    self.disk_collector_class = self.DEFAULT_DISK_COLLECTOR_CLASS
+    if enable_mesos_disk_collector:
+      self.disk_collector_class = MesosDiskCollector
+
+  def provides(self, sandbox):
+    return self.disk_collector_class(sandbox, settings=self.settings)
+
+
 class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
   """ Lightweight thread to aggregate resource consumption for a task's 
constituent processes.
       Actual resource calculation is delegated to collectors; this class 
periodically polls the
@@ -149,17 +166,18 @@ class TaskResourceMonitor(ResourceMonitorBase, 
ExceptionalThread):
   """
 
   PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS)
-  DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS)
   HISTORY_TIME = Amount(1, Time.HOURS)
 
-  def __init__(self,
-               task_id,
-               task_monitor,
-               disk_collector=DiskCollector,
-               process_collection_interval=PROCESS_COLLECTION_INTERVAL,
-               disk_collection_interval=DISK_COLLECTION_INTERVAL,
-               history_time=HISTORY_TIME,
-               history_provider=HistoryProvider()):
+  def __init__(
+      self,
+      task_id,
+      task_monitor,
+      disk_collector_provider=DiskCollectorProvider(),
+      process_collection_interval=PROCESS_COLLECTION_INTERVAL,
+      disk_collection_interval=DiskCollectorSettings.DISK_COLLECTION_INTERVAL,
+      history_time=HISTORY_TIME,
+      history_provider=HistoryProvider()):
+
     """
       task_monitor: TaskMonitor object specifying the task whose resources 
should be monitored
       sandbox: Directory for which to monitor disk utilisation
@@ -168,7 +186,8 @@ class TaskResourceMonitor(ResourceMonitorBase, 
ExceptionalThread):
     self._task_id = task_id
     log.debug('Initialising resource collection for task %s', self._task_id)
     self._process_collectors = dict()  # ProcessStatus => ProcessTreeCollector
-    self._disk_collector_class = disk_collector
+
+    self._disk_collector_provider = disk_collector_provider
     self._disk_collector = None
     self._process_collection_interval = 
process_collection_interval.as_(Time.SECONDS)
     self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
@@ -248,7 +267,7 @@ class TaskResourceMonitor(ResourceMonitorBase, 
ExceptionalThread):
         if not self._disk_collector:
           sandbox = self._task_monitor.get_sandbox()
           if sandbox:
-            self._disk_collector = self._disk_collector_class(sandbox)
+            self._disk_collector = 
self._disk_collector_provider.provides(sandbox)
         if self._disk_collector:
           self._disk_collector.sample()
         else:

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py 
b/src/main/python/apache/thermos/observer/task_observer.py
index a6870d4..94cd6c5 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -30,9 +30,10 @@ from twitter.common.lang import Lockable
 from twitter.common.quantity import Amount, Time
 
 from apache.thermos.common.path import TaskPath
+from apache.thermos.monitoring.disk import DiskCollectorSettings
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
-from apache.thermos.monitoring.resource import TaskResourceMonitor
+from apache.thermos.monitoring.resource import DiskCollectorProvider, 
TaskResourceMonitor
 
 from .detector import ObserverTaskDetector
 from .observed_task import ActiveObservedTask, FinishedObservedTask
@@ -54,11 +55,14 @@ class TaskObserver(ExceptionalThread, Lockable):
 
   POLLING_INTERVAL = Amount(5, Time.SECONDS)
 
-  def __init__(self,
-               path_detector,
-               interval=POLLING_INTERVAL,
-               
task_process_collection_interval=TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL,
-               
task_disk_collection_interval=TaskResourceMonitor.DISK_COLLECTION_INTERVAL):
+  def __init__(
+      self,
+      path_detector,
+      interval=POLLING_INTERVAL,
+      
task_process_collection_interval=TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL,
+      enable_mesos_disk_collector=False,
+      disk_collector_settings=DiskCollectorSettings()):
+
     self._detector = ObserverTaskDetector(
         path_detector,
         self.__on_active,
@@ -66,7 +70,8 @@ class TaskObserver(ExceptionalThread, Lockable):
         self.__on_removed)
     self._interval = interval
     self._task_process_collection_interval = task_process_collection_interval
-    self._task_disk_collection_interval = task_disk_collection_interval
+    self._enable_mesos_disk_collector = enable_mesos_disk_collector
+    self._disk_collector_settings = disk_collector_settings
     self._active_tasks = {}    # task_id => ActiveObservedTask
     self._finished_tasks = {}  # task_id => FinishedObservedTask
     self._stop_event = threading.Event()
@@ -101,18 +106,23 @@ class TaskObserver(ExceptionalThread, Lockable):
       log.error('Found an active task (%s) in finished tasks?', task_id)
       return
     task_monitor = TaskMonitor(root, task_id)
+
+    disk_collector_provider = DiskCollectorProvider(
+      self._enable_mesos_disk_collector,
+      self._disk_collector_settings)
+
     resource_monitor = TaskResourceMonitor(
         task_id,
         task_monitor,
+        disk_collector_provider=disk_collector_provider,
         process_collection_interval=self._task_process_collection_interval,
-        disk_collection_interval=self._task_disk_collection_interval)
+        
disk_collection_interval=self._disk_collector_settings.disk_collection_interval)
     resource_monitor.start()
     self._active_tasks[task_id] = ActiveObservedTask(
         root,
         task_id,
         task_monitor,
-        resource_monitor
-    )
+        resource_monitor)
 
   def __on_finished(self, root, task_id):
     log.debug('on_finished(%r, %r)', root, task_id)

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
----------------------------------------------------------------------
diff --git 
a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
 
b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
index fe74bd1..7b67bdb 100644
--- 
a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
+++ 
b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
@@ -23,7 +23,7 @@ from apache.aurora.executor.common.resource_manager import 
ResourceManagerProvid
 from apache.aurora.executor.common.sandbox import DirectorySandbox
 from apache.thermos.common.path import TaskPath
 from apache.thermos.core.helper import TaskRunnerHelper
-from apache.thermos.monitoring.disk import DiskCollector
+from apache.thermos.monitoring.resource import DiskCollectorProvider
 
 from gen.apache.thermos.ttypes import RunnerCkpt, RunnerHeader
 
@@ -88,8 +88,8 @@ def test_resource_manager():
     root = os.path.join(td, 'thermos')
     write_header(root, sandbox, assigned_task.taskId)
 
-    mock_disk_collector_class = mock.create_autospec(DiskCollector, 
spec_set=True)
-    mock_disk_collector = mock_disk_collector_class.return_value
+    mock_disk_collector_provider = mock.create_autospec(DiskCollectorProvider, 
spec_set=True)
+    mock_disk_collector = mock_disk_collector_provider.provides.return_value
 
     mock_disk_collector.sample.return_value = None
     value_mock = mock.PropertyMock(return_value=4197)
@@ -99,7 +99,7 @@ def test_resource_manager():
     completed_mock = mock.PropertyMock(return_value=completed_event)
     type(mock_disk_collector).completed_event = completed_mock
 
-    rmp = ResourceManagerProvider(root, 
disk_collector=mock_disk_collector_class)
+    rmp = ResourceManagerProvider(root, 
disk_collector_provider=mock_disk_collector_provider)
     rm = rmp.from_assigned_task(assigned_task, DirectorySandbox(sandbox))
 
     assert rm.status is None

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/test/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/monitoring/BUILD 
b/src/test/python/apache/thermos/monitoring/BUILD
index 8f2b393..1682913 100644
--- a/src/test/python/apache/thermos/monitoring/BUILD
+++ b/src/test/python/apache/thermos/monitoring/BUILD
@@ -16,10 +16,16 @@ python_tests(
   name = 'monitoring',
   sources = globs('*.py'),
   dependencies = [
+    '3rdparty/python:certifi',
+    '3rdparty/python:chardet',
+    '3rdparty/python:httpretty',
+    '3rdparty/python:idna',
     '3rdparty/python:mock',
+    '3rdparty/python:requests',
     '3rdparty/python:twitter.common.contextutil',
     '3rdparty/python:twitter.common.dirutil',
     '3rdparty/python:twitter.common.quantity',
+    '3rdparty/python:urllib3',
     'src/main/python/apache/thermos/common',
     'src/main/python/apache/thermos/monitoring',
   ]

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/test/python/apache/thermos/monitoring/test_disk.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/monitoring/test_disk.py 
b/src/test/python/apache/thermos/monitoring/test_disk.py
index 362393b..e1ef6d9 100644
--- a/src/test/python/apache/thermos/monitoring/test_disk.py
+++ b/src/test/python/apache/thermos/monitoring/test_disk.py
@@ -12,20 +12,30 @@
 # limitations under the License.
 #
 
+import json
 import os
 from tempfile import mkstemp
+from time import sleep
 from unittest import TestCase
 
+import httpretty
+from requests import ConnectionError
 from twitter.common import dirutil
 from twitter.common.dirutil import safe_mkdtemp
-from twitter.common.quantity import Amount, Data
+from twitter.common.quantity import Amount, Data, Time
 
-from apache.thermos.monitoring.disk import DiskCollector
+from apache.thermos.monitoring.disk import (
+    DiskCollectorSettings,
+    DuDiskCollector,
+    MesosDiskCollector
+)
 
 TEST_AMOUNT_1 = Amount(100, Data.MB)
 TEST_AMOUNT_2 = Amount(10, Data.MB)
 TEST_AMOUNT_SUM = TEST_AMOUNT_1 + TEST_AMOUNT_2
 
+LOOK_UP_ERROR_VALUE = -1
+
 
 def make_file(size, dir):
   _, filename = mkstemp(dir=dir)
@@ -59,10 +69,10 @@ def _run_collector_tests(collector, target, wait):
   assert TEST_AMOUNT_SUM.as_(Data.BYTES) > collector.value >= 
TEST_AMOUNT_2.as_(Data.BYTES)
 
 
-class TestDiskCollector(TestCase):
-  def test_du_diskcollector(self):
+class TestDuDiskCollector(TestCase):
+  def test_du_disk_collector(self):
     target = safe_mkdtemp()
-    collector = DiskCollector(target)
+    collector = DuDiskCollector(target)
 
     def wait():
       collector.sample()
@@ -70,3 +80,331 @@ class TestDiskCollector(TestCase):
         collector._thread.event.wait()
 
     _run_collector_tests(collector, target, wait)
+
+
+class TestMesosDiskCollector(TestCase):
+
+  def setUp(self):
+    self.agent_api_url = "http://localhost:5051/containers";
+    self.executor_id_json_path = "executor_id"
+    self.disk_usage_json_path = "statistics.disk_used_bytes"
+    self.collection_timeout = Amount(1, Time.SECONDS)
+    self.collection_interval = Amount(1, Time.SECONDS)
+
+    self.sandbox_path = "/var/lib/path/to/thermos-some-task-id"
+
+  @httpretty.activate
+  def test_mesos_disk_collector(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    first_json_body = json.dumps(
+      [
+        {
+          "executor_id": "thermos-some-task-id",
+          "statistics": {
+            "disk_used_bytes": 100
+          }
+        }
+      ]
+    )
+    second_json_body = json.dumps(
+      [
+        {
+          "executor_id": "thermos-some-task-id",
+          "statistics": {
+            "disk_used_bytes": 200
+          }
+        }
+      ]
+    )
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      responses=[
+        httpretty.Response(body=first_json_body, 
content_type='application/json'),
+        httpretty.Response(body=second_json_body, 
content_type='application/json')])
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    assert collector.value == 0
+
+    wait()
+    assert collector.value == 100
+
+    wait()
+    assert collector.value == 200
+
+    print (dir(httpretty.last_request()))
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_bad_api_path(self):
+    settings = DiskCollectorSettings(
+      http_api_url="http://localhost:5051/wrong_path";,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    json_body = json.dumps({"status": "bad_request"})
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=json_body,
+      content_type='application/json')
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    assert collector.value == 0
+
+    wait()
+    assert collector.value == LOOK_UP_ERROR_VALUE
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/wrong_path")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_unexpected_response_format(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    json_body = json.dumps({"status": "bad_request"})
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=json_body,
+      content_type='application/json')
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    assert collector.value == 0
+
+    wait()
+    assert collector.value == LOOK_UP_ERROR_VALUE
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_bad_executor_id_selector(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path="bad_path",
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    json_body = json.dumps(
+      [
+        {
+          "executor_id": "thermos-some-task-id",
+          "statistics": {
+            "disk_used_bytes": 100
+          }
+        }
+      ]
+    )
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=json_body,
+      content_type='application/json')
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    self.assertEquals(collector.value, 0)
+
+    wait()
+    self.assertEquals(collector.value, LOOK_UP_ERROR_VALUE)
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_bad_disk_usage_selector(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path="bad_path",
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    json_body = json.dumps(
+      [
+        {
+          "executor_id": "thermos-some-task-id",
+          "statistics": {
+            "disk_used_bytes": 100
+          }
+        }
+      ]
+    )
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      responses=[httpretty.Response(body=json_body, 
content_type='application/json')])
+
+    sandbox_path = "/var/lib/path/to/thermos-some-task-id"
+    collector = MesosDiskCollector(sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    self.assertEquals(collector.value, 0)
+
+    wait()
+    self.assertEquals(collector.value, LOOK_UP_ERROR_VALUE)
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_when_unauthorized(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval)
+
+    json_body = json.dumps(
+      [
+        {
+          "executor_id": "thermos-some-task-id",
+          "statistics": {
+            "disk_used_bytes": 100
+          }
+        }
+      ]
+    )
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=json_body,
+      status=401)
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    self.assertEquals(collector.value, 0)
+
+    wait()
+    self.assertEquals(collector.value, LOOK_UP_ERROR_VALUE)
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_when_connection_error(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=self.collection_timeout,
+      disk_collection_interval=self.collection_interval
+    )
+
+    def exception_callback(request, uri, headers):
+      raise ConnectionError
+
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=exception_callback
+    )
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    self.assertEquals(collector.value, 0)
+
+    wait()
+    self.assertEquals(collector.value, LOOK_UP_ERROR_VALUE)
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")
+
+  @httpretty.activate
+  def test_mesos_disk_collector_timeout(self):
+    settings = DiskCollectorSettings(
+      http_api_url=self.agent_api_url,
+      executor_id_json_path=self.executor_id_json_path,
+      disk_usage_json_path=self.disk_usage_json_path,
+      disk_collection_timeout=Amount(10, Time.MILLISECONDS),
+      disk_collection_interval=self.collection_interval
+    )
+
+    def callback(request, uri, headers):
+      json_body = json.dumps(
+        [
+          {
+            "executor_id": "thermos-some-task-id",
+            "statistics": {
+              "disk_used_bytes": 100
+            }
+          }
+        ]
+      )
+      sleep(5)
+      return (200, headers, json_body)
+
+    httpretty.register_uri(
+      method=httpretty.GET,
+      uri="http://localhost:5051/containers";,
+      body=callback
+    )
+
+    collector = MesosDiskCollector(self.sandbox_path, settings)
+
+    def wait():
+      collector.sample()
+      if collector._thread is not None:
+        collector._thread.event.wait()
+
+    self.assertEquals(collector.value, 0)
+
+    wait()
+    self.assertEquals(collector.value, LOOK_UP_ERROR_VALUE)
+
+    self.assertEquals(httpretty.last_request().method, "GET")
+    self.assertEquals(httpretty.last_request().path, "/containers")

http://git-wip-us.apache.org/repos/asf/aurora/blob/a3f8aef6/src/test/python/apache/thermos/monitoring/test_resource.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/monitoring/test_resource.py 
b/src/test/python/apache/thermos/monitoring/test_resource.py
index e577e55..4445064 100644
--- a/src/test/python/apache/thermos/monitoring/test_resource.py
+++ b/src/test/python/apache/thermos/monitoring/test_resource.py
@@ -19,9 +19,11 @@ import mock
 import pytest
 from twitter.common.quantity import Amount, Time
 
+from apache.thermos.monitoring.disk import DuDiskCollector, MesosDiskCollector
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
 from apache.thermos.monitoring.resource import (
+    DiskCollectorProvider,
     HistoryProvider,
     ResourceHistory,
     ResourceMonitorBase,
@@ -37,6 +39,16 @@ class TestResourceHistoryProvider(TestCase):
       HistoryProvider().provides(Amount(1, Time.DAYS), 1)
 
 
+class TestDiskCollectorProvider(TestCase):
+  def test_default_collector_class(self):
+    assert isinstance(DiskCollectorProvider().provides("some_path"), 
DuDiskCollector)
+
+  def test_mesos_collector_class(self):
+    assert isinstance(
+      
DiskCollectorProvider(enable_mesos_disk_collector=True).provides("some_path"),
+      MesosDiskCollector)
+
+
 class TestResourceHistory(TestCase):
   def setUp(self):
     self.max_len = 4

Reply via email to