Repository: aurora
Updated Branches:
  refs/heads/master 795a2728c -> 633948ab0


Aurora admin commands for reconciliation

* A new command for task reconciliation `reconcile_tasks` was added to the
  aurora_admin CLI. It takes type of reconciliation and the batch size(for
  explicit reconciliation) as options.
* As part of this change two thirft APIs were also added -
  `triggerImplicitTaskReconciliation` and `triggerExplicitTaskReconciliation`.

Testing Done:
* Manually tested on my local vagrant installation.
* ./build-support/jenkins/build.sh

Bugs closed: AURORA-1602

Reviewed at https://reviews.apache.org/r/51662/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/633948ab
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/633948ab
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/633948ab

Branch: refs/heads/master
Commit: 633948ab07b9e794a828cb35e9e75bb19d20f061
Parents: 795a272
Author: Karthik Anantha Padmanabhan <[email protected]>
Authored: Tue Sep 13 13:02:59 2016 -0700
Committer: Zameer Manji <[email protected]>
Committed: Tue Sep 13 13:02:59 2016 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |  2 +
 .../thrift/org/apache/aurora/gen/api.thrift     | 10 +++
 .../reconciliation/TaskReconciler.java          | 63 +++++++++------
 .../thrift/SchedulerThriftInterface.java        | 33 +++++++-
 src/main/python/apache/aurora/admin/admin.py    | 27 +++++++
 .../python/apache/aurora/client/api/__init__.py |  8 ++
 .../reconciliation/TaskReconcilerTest.java      | 18 ++++-
 .../thrift/SchedulerThriftInterfaceTest.java    | 59 +++++++++++++-
 .../python/apache/aurora/admin/test_admin.py    | 82 +++++++++++++++++++-
 .../aurora/client/api/test_scheduler_client.py  | 13 ++++
 10 files changed, 284 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index b947c23..23e8168 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -34,6 +34,8 @@
   scheduler upgrade should be performed by bringing all schedulers down, and 
then bringing upgraded
   schedulers up. A rolling upgrade would result in no leading scheduler for 
the duration of the
   roll which could be confusing to monitor and debug.
+- A new command `aurora_admin reconcile_tasks` is now available on the Aurora 
admin client that can trigger
+  implicit and explicit task reconciliations.
 - Add a new MTTS (Median Time To Starting) metric in addition to MTTA and MTTR.
 - In addition to CPU resources, RAM resources can now be treated as revocable 
via the scheduler
   commandline flag `-enable_revocable_ram`.

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift 
b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 3f7079d..ca014f3 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1150,6 +1150,10 @@ struct RewriteConfigsRequest {
   1: list<ConfigRewrite> rewriteCommands
 }
 
+struct ExplicitReconciliationSettings {
+  1: optional i32 batchSize
+}
+
 // It would be great to compose these services rather than extend, but that 
won't be possible until
 // https://issues.apache.org/jira/browse/THRIFT-66 is resolved.
 service AuroraAdmin extends AuroraSchedulerManager {
@@ -1209,6 +1213,12 @@ service AuroraAdmin extends AuroraSchedulerManager {
    * that the caller take care to provide valid input and alter only necessary 
fields.
    */
   Response rewriteConfigs(1: RewriteConfigsRequest request)
+
+  /** Tell scheduler to trigger an explicit task reconciliation with the given 
settings. */
+  Response triggerExplicitTaskReconciliation(1: ExplicitReconciliationSettings 
settings)
+
+  /** Tell scheduler to trigger an implicit task reconciliation. */
+  Response triggerImplicitTaskReconciliation()
 }
 
 // The name of the header that should be sent to bypass leader redirection in 
the Scheduler.

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java 
b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index 3275d72..343ffdb 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -21,6 +21,7 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -119,40 +120,58 @@ public class TaskReconciler extends AbstractIdleService {
     this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME);
   }
 
+  public void triggerExplicitReconciliation(Optional<Integer> batchSize) {
+    doExplicitReconcile(batchSize.or(settings.explicitBatchSize));
+  }
+
+  public void triggerImplicitReconciliation() {
+    doImplicitReconcile();
+  }
+
   @Override
   protected void startUp() {
-    // Schedule explicit reconciliation.
+    scheduleExplicitReconciliation();
+    scheduleImplicitReconciliation();
+  }
+
+  private void scheduleExplicitReconciliation() {
     executor.scheduleAtFixedRate(
-        () -> {
-          ImmutableList<TaskStatus> active = FluentIterable
-              .from(Storage.Util.fetchTasks(
-                  storage,
-                  Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
-              .transform(TASK_TO_PROTO)
-              .toList();
-
-          List<List<TaskStatus>> batches = Lists.partition(active, 
settings.explicitBatchSize);
-          long delay = 0;
-          for (List<TaskStatus> batch : batches) {
-            executor.schedule(() -> driver.reconcileTasks(batch), delay, 
SECONDS.getTimeUnit());
-            delay += settings.explicitBatchDelaySeconds;
-          }
-          explicitRuns.incrementAndGet();
-        },
+        () -> doExplicitReconcile(settings.explicitBatchSize),
         settings.explicitDelayMinutes,
         settings.explicitInterval.as(MINUTES),
         MINUTES.getTimeUnit());
-    // Schedule implicit reconciliation.
+  }
+
+  private void scheduleImplicitReconciliation() {
     executor.scheduleAtFixedRate(
-        () -> {
-          driver.reconcileTasks(ImmutableSet.of());
-          implicitRuns.incrementAndGet();
-        },
+        () -> doImplicitReconcile(),
         settings.implicitDelayMinutes,
         settings.implicitInterval.as(MINUTES),
         MINUTES.getTimeUnit());
   }
 
+  private void doImplicitReconcile() {
+    driver.reconcileTasks(ImmutableSet.of());
+    implicitRuns.incrementAndGet();
+  }
+
+  private void doExplicitReconcile(int batchSize) {
+    ImmutableList<TaskStatus> active = FluentIterable
+        .from(Storage.Util.fetchTasks(
+            storage,
+            Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
+        .transform(TASK_TO_PROTO)
+        .toList();
+
+    List<List<TaskStatus>> batches = Lists.partition(active, batchSize);
+    long delay = 0;
+    for (List<TaskStatus> batch : batches) {
+      executor.schedule(() -> driver.reconcileTasks(batch), delay, 
SECONDS.getTimeUnit());
+      delay += settings.explicitBatchDelaySeconds;
+    }
+    explicitRuns.incrementAndGet();
+  }
+
   @Override
   protected void shutDown() {
     // Nothing to do - await VM shutdown.

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index cd18d0d..26c45fd 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -23,6 +23,7 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.FluentIterable;
@@ -38,6 +39,7 @@ import com.google.common.collect.Range;
 import org.apache.aurora.gen.ConfigRewrite;
 import org.apache.aurora.gen.DrainHostsResult;
 import org.apache.aurora.gen.EndMaintenanceResult;
+import org.apache.aurora.gen.ExplicitReconciliationSettings;
 import org.apache.aurora.gen.Hosts;
 import org.apache.aurora.gen.InstanceKey;
 import org.apache.aurora.gen.InstanceTaskConfig;
@@ -79,6 +81,7 @@ import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
+import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -168,6 +171,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
   private final JobUpdateController jobUpdateController;
   private final ReadOnlyScheduler.Iface readOnlyScheduler;
   private final AuditMessages auditMessages;
+  private final TaskReconciler taskReconciler;
 
   @Inject
   SchedulerThriftInterface(
@@ -185,7 +189,8 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
       UUIDGenerator uuidGenerator,
       JobUpdateController jobUpdateController,
       ReadOnlyScheduler.Iface readOnlyScheduler,
-      AuditMessages auditMessages) {
+      AuditMessages auditMessages,
+      TaskReconciler taskReconciler) {
 
     this.configurationManager = requireNonNull(configurationManager);
     this.thresholds = requireNonNull(thresholds);
@@ -202,6 +207,7 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
     this.jobUpdateController = requireNonNull(jobUpdateController);
     this.readOnlyScheduler = requireNonNull(readOnlyScheduler);
     this.auditMessages = requireNonNull(auditMessages);
+    this.taskReconciler = requireNonNull(taskReconciler);
   }
 
   @Override
@@ -627,6 +633,31 @@ class SchedulerThriftInterface implements 
AnnotatedAuroraAdmin {
     });
   }
 
+  @Override
+  public Response 
triggerExplicitTaskReconciliation(ExplicitReconciliationSettings settings)
+      throws TException {
+    try {
+      requireNonNull(settings);
+      Preconditions.checkArgument(!settings.isSetBatchSize() || 
settings.getBatchSize() > 0,
+          "Batch size must be greater than zero.");
+
+      Optional<Integer> batchSize = settings.isSetBatchSize()
+          ? Optional.of(settings.getBatchSize())
+          : Optional.absent();
+
+      taskReconciler.triggerExplicitReconciliation(batchSize);
+      return ok();
+    } catch (IllegalArgumentException e) {
+      return error(INVALID_REQUEST, e);
+    }
+  }
+
+  @Override
+  public Response triggerImplicitTaskReconciliation() throws TException {
+    taskReconciler.triggerImplicitReconciliation();
+    return ok();
+  }
+
   private Optional<String> rewriteJob(IJobConfigRewrite jobRewrite, 
CronJobStore.Mutable jobStore) {
     IJobConfiguration existingJob = jobRewrite.getOldJob();
     IJobConfiguration rewrittenJob;

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/main/python/apache/aurora/admin/admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/admin.py 
b/src/main/python/apache/aurora/admin/admin.py
index 76009b9..9fc89a2 100644
--- a/src/main/python/apache/aurora/admin/admin.py
+++ b/src/main/python/apache/aurora/admin/admin.py
@@ -336,6 +336,33 @@ def scheduler_snapshot(cluster):
 
 
 @app.command
[email protected]('cluster')
[email protected]_option('-t', '--type', default='explicit', choices=['implicit', 
'explicit'],
+    help='Type of reconciliation to run - implicit or explicit')
[email protected]_option('-b', '--batch_size', default=None, type=int,
+    help='Batch size for explicit reconciliation')
+def reconcile_tasks(cluster):
+  """usage: reconcile_tasks
+            [--type=RECONCILIATION_TYPE]
+            [--batch_size=BATCHSIZE]
+            cluster
+
+  Reconcile the Mesos master and the scheduler. Default runs explicit
+  reconciliation with a batch size set in reconciliation_explicit_batch_size
+  scheduler configuration option.
+  """
+  options = app.get_options()
+  client = make_admin_client(cluster)
+  if options.type == 'implicit':
+    resp = client.reconcile_implicit()
+  elif options.type == 'explicit':
+    resp = client.reconcile_explicit(options.batch_size)
+  else:
+    die('Unexpected value for --type: %s' % options.type)
+  check_and_log_response(resp)
+
+
[email protected]
 @app.command_option('-X', '--exclude_file', dest='exclude_filename', 
default=None,
     help='Exclusion filter. An optional text file listing host names (one per 
line)'
          'to exclude from the result set if found.')

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py 
b/src/main/python/apache/aurora/client/api/__init__.py
index 05b9c3b..f8b9690 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -26,6 +26,7 @@ from .updater_util import UpdaterConfig
 
 from gen.apache.aurora.api.constants import LIVE_STATES
 from gen.apache.aurora.api.ttypes import (
+    ExplicitReconciliationSettings,
     InstanceKey,
     JobKey,
     JobUpdateKey,
@@ -352,6 +353,13 @@ class AuroraClientAPI(object):
         min_instance_count,
         hosts)
 
+  def reconcile_explicit(self, batch_size):
+    return self._scheduler_proxy.triggerExplicitTaskReconciliation(
+      ExplicitReconciliationSettings(batchSize=batch_size))
+
+  def reconcile_implicit(self):
+    return self._scheduler_proxy.triggerImplicitTaskReconciliation()
+
   def _assert_valid_job_key(self, job_key):
     if not isinstance(job_key, AuroraJobKey):
       raise TypeError('Invalid job_key %r: expected %s but got %s'

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
 
b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
index b9317dc..cc6f44d 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java
@@ -17,6 +17,7 @@ import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -97,20 +98,20 @@ public class TaskReconcilerTest extends EasyMockTest {
     storageUtil.expectTaskFetch(
         Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES),
         task1,
-        task2).times(5);
+        task2).times(7);
 
     List<List<Protos.TaskStatus>> batches = Lists.partition(ImmutableList.of(
         TASK_TO_PROTO.apply(task1),
         TASK_TO_PROTO.apply(task2)), BATCH_SIZE);
 
     driver.reconcileTasks(batches.get(0));
-    expectLastCall().times(5);
+    expectLastCall().times(7);
 
     driver.reconcileTasks(batches.get(1));
-    expectLastCall().times(5);
+    expectLastCall().times(7);
 
     driver.reconcileTasks(EasyMock.anyObject());
-    expectLastCall().times(2);
+    expectLastCall().times(3);
 
     control.replay();
 
@@ -138,6 +139,15 @@ public class TaskReconcilerTest extends EasyMockTest {
     clock.advance(IMPLICT_SCHEDULE);
     assertEquals(5L, explicitRuns.get());
     assertEquals(2L, implicitRuns.get());
+
+    reconciler.triggerExplicitReconciliation(Optional.of(BATCH_SIZE));
+    assertEquals(6L, explicitRuns.get());
+    reconciler.triggerImplicitReconciliation();
+    assertEquals(3L, implicitRuns.get());
+
+    reconciler.triggerExplicitReconciliation(Optional.absent());
+    assertEquals(7L, explicitRuns.get());
+    assertEquals(3L, implicitRuns.get());
   }
 
   @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 779dc30..fa32750 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -34,6 +34,7 @@ import org.apache.aurora.gen.ConfigRewrite;
 import org.apache.aurora.gen.Constraint;
 import org.apache.aurora.gen.Container;
 import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.ExplicitReconciliationSettings;
 import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.Hosts;
 import org.apache.aurora.gen.Identity;
@@ -83,6 +84,7 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.cron.SanitizedCronJob;
 import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -184,6 +186,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   private JobUpdateController jobUpdateController;
   private ReadOnlyScheduler.Iface readOnlyScheduler;
   private AuditMessages auditMessages;
+  private TaskReconciler taskReconciler;
 
   @Before
   public void setUp() throws Exception {
@@ -201,6 +204,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
     jobUpdateController = createMock(JobUpdateController.class);
     readOnlyScheduler = createMock(ReadOnlyScheduler.Iface.class);
     auditMessages = createMock(AuditMessages.class);
+    taskReconciler = createMock(TaskReconciler.class);
 
     thrift = getResponseProxy(
         new SchedulerThriftInterface(
@@ -218,7 +222,8 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
             uuidGenerator,
             jobUpdateController,
             readOnlyScheduler,
-            auditMessages));
+            auditMessages,
+            taskReconciler));
   }
 
   private static AuroraAdmin.Iface getResponseProxy(AuroraAdmin.Iface 
realThrift) {
@@ -1250,6 +1255,58 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   }
 
   @Test
+  public void testExplicitTaskReconciliationWithNoBatchSize() throws Exception 
{
+    ExplicitReconciliationSettings settings = new 
ExplicitReconciliationSettings();
+
+    taskReconciler.triggerExplicitReconciliation(Optional.absent());
+    expectLastCall();
+
+    control.replay();
+
+    assertOkResponse(thrift.triggerExplicitTaskReconciliation(settings));
+  }
+
+  @Test
+  public void testExplicitTaskReconciliationWithValidBatchSize() throws 
Exception {
+    ExplicitReconciliationSettings settings = new 
ExplicitReconciliationSettings();
+    settings.setBatchSize(10);
+
+    
taskReconciler.triggerExplicitReconciliation(Optional.of(settings.getBatchSize()));
+    expectLastCall();
+
+    control.replay();
+    assertOkResponse(thrift.triggerExplicitTaskReconciliation(settings));
+  }
+
+  @Test
+  public void testExplicitTaskReconciliationWithNegativeBatchSize() throws 
Exception {
+    ExplicitReconciliationSettings settings = new 
ExplicitReconciliationSettings();
+    settings.setBatchSize(-1000);
+
+    control.replay();
+    assertResponse(INVALID_REQUEST, 
thrift.triggerExplicitTaskReconciliation(settings));
+  }
+
+  @Test
+  public void testExplicitTaskReconciliationWithZeroBatchSize() throws 
Exception {
+    ExplicitReconciliationSettings settings = new 
ExplicitReconciliationSettings();
+    settings.setBatchSize(0);
+
+    control.replay();
+    assertResponse(INVALID_REQUEST, 
thrift.triggerExplicitTaskReconciliation(settings));
+  }
+
+  @Test
+  public void testImplicitTaskReconciliation() throws Exception {
+    taskReconciler.triggerImplicitReconciliation();
+    expectLastCall();
+
+    control.replay();
+
+    assertOkResponse(thrift.triggerImplicitTaskReconciliation());
+  }
+
+  @Test
   public void testAddInstancesWithInstanceKey() throws Exception {
     expectNoCronJob();
     lockManager.assertNotLocked(LOCK_KEY);

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/test/python/apache/aurora/admin/test_admin.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_admin.py 
b/src/test/python/apache/aurora/admin/test_admin.py
index eb193c4..f720742 100644
--- a/src/test/python/apache/aurora/admin/test_admin.py
+++ b/src/test/python/apache/aurora/admin/test_admin.py
@@ -14,9 +14,15 @@
 
 import contextlib
 
-from mock import PropertyMock, create_autospec, patch
-
-from apache.aurora.admin.admin import get_scheduler, increase_quota, query, 
set_quota
+from mock import PropertyMock, call, create_autospec, patch
+
+from apache.aurora.admin.admin import (
+    get_scheduler,
+    increase_quota,
+    query,
+    reconcile_tasks,
+    set_quota
+)
 from apache.aurora.client.api import AuroraClientAPI
 from apache.aurora.client.api.scheduler_client import SchedulerClient, 
SchedulerProxy
 
@@ -213,3 +219,73 @@ class TestGetSchedulerCommand(AuroraClientCommandTest):
       get_scheduler([self.TEST_CLUSTER])
 
       mock_raw_url.assert_called_once_with()
+
+
+class TestReconcileTaskCommand(AuroraClientCommandTest):
+
+  @classmethod
+  def setup_mock_options(cls, reconcile_type='explicit', batch_size=None):
+    mock_options = create_autospec(spec=['type', 'batch_size'], instance=True)
+    mock_options.type = reconcile_type
+    mock_options.batch_size = batch_size
+    return mock_options
+
+  def test_reconcile_implicit(self):
+    """Tests successful execution of the reconcile_tasks command."""
+    mock_options = self.setup_mock_options(reconcile_type='implicit')
+    mock_proxy = create_autospec(spec=SchedulerProxy, instance=True)
+    mock_scheduler_client = create_autospec(spec=SchedulerClient)
+    mock_proxy.scheduler_client.return_value = mock_scheduler_client
+
+    with contextlib.nested(
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.admin.admin.make_admin_client',
+              return_value=create_autospec(spec=AuroraClientAPI)),
+        patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS),
+    ) as (_, mock_make_admin_client, _):
+
+      api = mock_make_admin_client.return_value
+      type(api).scheduler_proxy = PropertyMock(return_value=mock_proxy)
+      api.reconcile_implicit.return_value = 
self.create_simple_success_response()
+      reconcile_tasks([self.TEST_CLUSTER])
+      assert api.reconcile_implicit.mock_calls == [call()]
+
+  def test_reconcile_explicit(self):
+    """Tests successful execution of the reconcile_tasks command."""
+    mock_options = self.setup_mock_options()
+    mock_proxy = create_autospec(spec=SchedulerProxy, instance=True)
+    mock_scheduler_client = create_autospec(spec=SchedulerClient, 
instance=True)
+    mock_proxy.scheduler_client.return_value = mock_scheduler_client
+
+    with contextlib.nested(
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.admin.admin.make_admin_client',
+              return_value=create_autospec(spec=AuroraClientAPI)),
+        patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS),
+    ) as (_, mock_make_admin_client, _):
+
+      api = mock_make_admin_client.return_value
+      type(api).scheduler_proxy = PropertyMock(return_value=mock_proxy)
+      api.reconcile_explicit.return_value = 
self.create_simple_success_response()
+      reconcile_tasks([self.TEST_CLUSTER])
+      assert api.reconcile_explicit.mock_calls == [call(None)]
+
+  def test_reconcile_explicit_batch_size(self):
+    """Tests successful execution of the reconcile_tasks command."""
+    mock_options = self.setup_mock_options(batch_size=500)
+    mock_proxy = create_autospec(spec=SchedulerProxy, instance=True)
+    mock_scheduler_client = create_autospec(spec=SchedulerClient, 
instance=True)
+    mock_proxy.scheduler_client.return_value = mock_scheduler_client
+
+    with contextlib.nested(
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.admin.admin.make_admin_client',
+              return_value=create_autospec(spec=AuroraClientAPI)),
+        patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS),
+    ) as (_, mock_make_admin_client, _):
+
+      api = mock_make_admin_client.return_value
+      type(api).scheduler_proxy = PropertyMock(return_value=mock_proxy)
+      api.reconcile_explicit.return_value = 
self.create_simple_success_response()
+      reconcile_tasks([self.TEST_CLUSTER])
+      assert api.reconcile_explicit.mock_calls == [call(500)]

http://git-wip-us.apache.org/repos/asf/aurora/blob/633948ab/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py 
b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index afac250..f6018ca 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -34,6 +34,7 @@ import gen.apache.aurora.api.AuroraAdmin as AuroraAdmin
 import gen.apache.aurora.api.AuroraSchedulerManager as AuroraSchedulerManager
 from gen.apache.aurora.api.constants import BYPASS_LEADER_REDIRECT_HEADER_NAME
 from gen.apache.aurora.api.ttypes import (
+    ExplicitReconciliationSettings,
     Hosts,
     JobConfiguration,
     JobKey,
@@ -287,6 +288,18 @@ class 
TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
     self.mox.ReplayAll()
     self.make_scheduler_proxy().rewriteConfigs(RewriteConfigsRequest())
 
+  def test_triggerExplicitTaskReconciliation(self):
+    self.mock_thrift_client.triggerExplicitTaskReconciliation(
+      IsA(ExplicitReconciliationSettings)).AndReturn(DEFAULT_RESPONSE)
+    self.mox.ReplayAll()
+    self.make_scheduler_proxy().triggerExplicitTaskReconciliation(
+      ExplicitReconciliationSettings(batchSize=None))
+
+  def test_triggerImplicitTaskReconciliation(self):
+    
self.mock_thrift_client.triggerImplicitTaskReconciliation().AndReturn(DEFAULT_RESPONSE)
+    self.mox.ReplayAll()
+    self.make_scheduler_proxy().triggerImplicitTaskReconciliation()
+
 
 def mock_auth():
   auth_mock = mock.create_autospec(spec=AuthModule, instance=True)

Reply via email to