Repository: aurora
Updated Branches:
  refs/heads/master 31404487d -> 8508aec36


Introduce a `countdown-ms` param in Coordinator request.

With the introduction of `timeoutSecs` for HostMaintenanceRequest
and the `CoordinatorSlaPolicy`, it will be beneficial to expose the
time remaining until forced maintenance to the Coordinator. Send
the time remaining until force task maintenance as an extra query
param to the Coordinator.

Testing Done:
./gradlew test
./build-support/jenkins/build.sh

**Tested on Vagrant**

***Logs from Coordinator***
Request received for {'task': ['devcluster/vagrant/test/coordinator/0']}
{
  "forceMaintenanceCountdownMs": "604755646",
  "task": "devcluster/vagrant/test/coordinator/0",
  "taskConfig": {
    "assignedTask": {
      "assignedPorts": {},
      "instanceId": 0,
      "slaveHost": "192.168.33.7",
      "slaveId": "f0336813-864b-4c8f-914c-80f8cef3b61d-S0",
      "task": {
      ...<SNIPPED>
}
Responded: True

Reviewed at https://reviews.apache.org/r/67657/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8508aec3
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8508aec3
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8508aec3

Branch: refs/heads/master
Commit: 8508aec36b8ddd3d8594d3d57d27b4e4ae7faea7
Parents: 3140448
Author: Santhosh Kumar Shanmugham <[email protected]>
Authored: Wed Jun 20 17:27:48 2018 -0700
Committer: Santhosh Kumar <[email protected]>
Committed: Wed Jun 20 17:27:48 2018 -0700

----------------------------------------------------------------------
 docs/features/sla-requirements.md               | 36 ++++----
 .../maintenance/MaintenanceController.java      | 30 ++++---
 .../apache/aurora/scheduler/sla/SlaManager.java | 25 ++++--
 .../MaintenanceControllerImplTest.java          |  5 ++
 .../aurora/scheduler/sla/SlaManagerTest.java    | 92 +++++++++++++++++++-
 5 files changed, 155 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/docs/features/sla-requirements.md
----------------------------------------------------------------------
diff --git a/docs/features/sla-requirements.md 
b/docs/features/sla-requirements.md
index 555b174..2b3fa65 100644
--- a/docs/features/sla-requirements.md
+++ b/docs/features/sla-requirements.md
@@ -123,8 +123,8 @@ specified for a job, any action that requires removing a 
task
 (such as drains) will be required to get approval from the `Coordinator` 
before proceeding. The
 coordinator service needs to expose a HTTP endpoint, that can take a 
`task-key` param
 (`<cluster>/<role>/<env>/<name>/<instance>`) and a json body describing the 
task
-details and return a response json that will contain the boolean status for 
allowing or disallowing
-the task's removal.
+details, force maintenance countdown (ms) and other params and return a 
response json that will
+contain the boolean status for allowing or disallowing the task's removal.
 
 ##### Request:
 ```javascript
@@ -132,24 +132,28 @@ POST /
   ?task=<cluster>/<role>/<env>/<name>/<instance>
 
 {
-  "assignedTask": {
-    "taskId": "taskA",
-    "slaveHost": "a",
-    "task": {
-      "job": {
-        "role": "role",
-        "environment": "devel",
-        "name": "job"
+  "forceMaintenanceCountdownMs": "604755646",
+  "task": "cluster/role/devel/job/1",
+  "taskConfig": {
+    "assignedTask": {
+      "taskId": "taskA",
+      "slaveHost": "a",
+      "task": {
+        "job": {
+          "role": "role",
+          "environment": "devel",
+          "name": "job"
+        },
+        ...
       },
+      "assignedPorts": {
+        "http": 1000
+      },
+      "instanceId": 1
       ...
     },
-    "assignedPorts": {
-      "http": 1000
-    },
-    "instanceId": 1
     ...
-  },
-  ...
+  }
 }
 ```
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
index 626a682..344a430 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
@@ -33,6 +33,7 @@ import com.google.common.base.Predicates;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -177,6 +178,8 @@ public interface MaintenanceController {
     @VisibleForTesting
     static final String DRAINING_MESSAGE = "Draining machine for maintenance.";
 
+    private static final String COUNTDOWN_MS_PARAM = 
"forceMaintenanceCountdownMs";
+
     private static final String MAINTENANCE_COUNTDOWN_STAT_NAME = 
"maintenance_countdown_ms";
     private static final String MISSING_MAINTENANCE_REQUEST = 
"missing_maintenance_request";
     private static final SlaPolicy ZERO_PERCENT_SLA = 
SlaPolicy.percentageSlaPolicy(
@@ -465,19 +468,23 @@ public interface MaintenanceController {
       }
 
       boolean force = false;
-      long expireMs =
-          System.currentTimeMillis() - 
hostMaintenanceRequest.get().getCreatedTimestampMs();
-      long maintenanceCountDownMs =
-          TimeAmount.of(hostMaintenanceRequest.get().getTimeoutSecs(), 
Time.SECONDS)
-              .as(Time.MILLISECONDS) - expireMs;
+      long startMs = hostMaintenanceRequest.get().getCreatedTimestampMs();
+      long timeoutMs = TimeAmount.of(
+            hostMaintenanceRequest.get().getTimeoutSecs(),
+            Time.SECONDS)
+          .as(Time.MILLISECONDS);
+      long endMs = startMs + timeoutMs;
+      long remainingMs = endMs - System.currentTimeMillis();
       maintenanceCountDownByTask.get(
           Joiner.on("_")
-              .join(MAINTENANCE_COUNTDOWN_STAT_NAME,
-                  InstanceKeys.toString(Tasks.getJob(task), 
Tasks.getInstanceId(task))))
-          .getAndSet(maintenanceCountDownMs);
-
-      if (hostMaintenanceRequest.get().getTimeoutSecs()
-            < TimeAmount.of(expireMs, Time.MILLISECONDS).as(Time.SECONDS)) {
+              .join(
+                  MAINTENANCE_COUNTDOWN_STAT_NAME,
+                  InstanceKeys.toString(Tasks.getJob(task), 
Tasks.getInstanceId(task))
+              )
+            )
+          .getAndSet(remainingMs);
+
+      if (remainingMs < 0) {
         LOG.warn("Maintenance request timed out for host: {} after {} secs. 
Forcing drain of {}.",
             host, hostMaintenanceRequest.get().getTimeoutSecs(), 
Tasks.id(task));
         force = true;
@@ -496,6 +503,7 @@ public interface MaintenanceController {
               Optional.empty(),
               ScheduleStatus.DRAINING,
               Optional.of(DRAINING_MESSAGE)),
+          ImmutableMap.of(COUNTDOWN_MS_PARAM, Long.toString(remainingMs)),
           force);
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java 
b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
index 9c5caf4..35ca771 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
@@ -33,6 +33,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Striped;
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.inject.Inject;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
@@ -266,13 +268,15 @@ public class SlaManager extends AbstractIdleService {
    * @param task Task whose SLA is to checked.
    * @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA.
    * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param params dictionary of key-value pairs to send in json body to 
Coordinator
    * @param <T> The type of result the {@link Storage.MutateWork} produces.
    * @param <E> The type of exception the {@link Storage.MutateWork} throw.
    */
   private <T, E extends Exception> void askCoordinatorThenAct(
       IScheduledTask task,
       ICoordinatorSlaPolicy slaPolicy,
-      Storage.MutateWork<T, E> work) {
+      Storage.MutateWork<T, E> work,
+      Map<String, String> params) {
 
     String taskKey = getTaskKey(task);
 
@@ -287,7 +291,7 @@ public class SlaManager extends AbstractIdleService {
             taskKey);
         attemptsCounter.incrementAndGet();
 
-        if (coordinatorAllows(task, taskKey, slaPolicy)) {
+        if (coordinatorAllows(task, taskKey, slaPolicy, params)) {
           LOG.info("Performing work after coordinator: {} approval for task: 
{}",
               slaPolicy.getCoordinatorUrl(),
               taskKey);
@@ -322,14 +326,22 @@ public class SlaManager extends AbstractIdleService {
   private boolean coordinatorAllows(
       IScheduledTask task,
       String taskKey,
-      ICoordinatorSlaPolicy slaPolicy)
+      ICoordinatorSlaPolicy slaPolicy,
+      Map<String, String> params)
       throws InterruptedException, ExecutionException, TException {
 
     LOG.info("Checking coordinator: {} for task: {}", 
slaPolicy.getCoordinatorUrl(), taskKey);
 
+    String taskConfig = new TSerializer(new TSimpleJSONProtocol.Factory())
+        .toString(task.newBuilder());
+    JsonObject jsonBody = new JsonObject();
+    jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+    jsonBody.addProperty(TASK_PARAM, taskKey);
+    params.forEach(jsonBody::addProperty);
+
     Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl())
         .setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey)))
-        .setBody(new TSerializer(new 
TSimpleJSONProtocol.Factory()).toString(task.newBuilder()))
+        .setBody(new Gson().toJson(jsonBody))
         .execute()
         .get();
 
@@ -391,6 +403,7 @@ public class SlaManager extends AbstractIdleService {
    * @param task Task whose SLA is to be checked.
    * @param slaPolicy {@link ISlaPolicy} to use.
    * @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+   * @param params dictionary of key-value pairs to send in json body to 
Coordinator
    * @param force boolean to indicate if work should be performed without 
checking SLA.
    * @param <T> The type of result the {@link Storage.MutateWork} produces.
    * @param <E> The type of exception the {@link Storage.MutateWork} throw.
@@ -401,6 +414,7 @@ public class SlaManager extends AbstractIdleService {
       IScheduledTask task,
       ISlaPolicy slaPolicy,
       Storage.MutateWork<T, E> work,
+      Map<String, String> params,
       boolean force) throws E {
 
     if (force) {
@@ -417,7 +431,8 @@ public class SlaManager extends AbstractIdleService {
       executor.execute(() -> askCoordinatorThenAct(
           task,
           slaPolicy.getCoordinatorSlaPolicy(),
-          work));
+          work,
+          params));
     } else {
       // verify sla and perform work if satisfied
       storage.write(store -> {

http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
index c9390df..1a62f5a 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
@@ -17,6 +17,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.AbstractModule;
@@ -174,6 +175,7 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
 
     storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
@@ -294,6 +296,7 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
     expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
         .andReturn(Optional.of(maintenanceRequest)).times(2);
@@ -385,6 +388,7 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
         IHostMaintenanceRequest.build(new HostMaintenanceRequest()
             .setHost(HOST_A)
             .setCreatedTimestampMs(System.currentTimeMillis())
+            .setTimeoutSecs(7200)
             .setDefaultSlaPolicy(SLA_POLICY));
     expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
         .andReturn(Optional.of(maintenanceRequest)).times(1);
@@ -402,6 +406,7 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
         eq(task),
         eq(ISlaPolicy.build(SLA_POLICY)),
         anyObject(Storage.MutateWork.class),
+        anyObject(ImmutableMap.class),
         eq(force));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java 
b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
index 759a1bc..6881678 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.sla;
 
 import java.io.IOException;
 import java.net.URLEncoder;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +26,11 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -207,6 +212,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -245,6 +251,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -279,6 +286,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -310,6 +318,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -341,6 +350,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -372,6 +382,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -403,6 +414,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -427,6 +439,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         true);
   }
 
@@ -462,6 +475,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -503,6 +517,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -540,6 +555,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -571,6 +587,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -602,6 +619,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -633,6 +651,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -665,6 +684,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         false);
   }
 
@@ -690,6 +710,7 @@ public class SlaManagerTest extends EasyMockTest {
         storeProvider -> storeProvider
             .getUnsafeTaskStore()
             .fetchTask(task1.getAssignedTask().getTaskId()),
+        ImmutableMap.of(),
         true);
   }
 
@@ -724,6 +745,52 @@ public class SlaManagerTest extends EasyMockTest {
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
+          false);
+    }
+
+    // wait until we are sure that the server has responded
+    coordinatorResponded.await();
+    workCalled.await();
+
+    assertEquals(0, coordinatorResponded.getCount());
+    // check the work was called
+    assertEquals(0, workCalled.getCount());
+  }
+
+  /**
+   * Verifies that SLA check passes and the supplied {@link 
Storage.MutateWork} gets executed
+   * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds
+   * with {@code {"drain": true}} when supplied with extra params.
+   */
+  @Test
+  public void testCheckCoordinatorWithExtraParamsSlaPassesThenActs() throws 
Exception {
+    IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+    CountDownLatch workCalled = new CountDownLatch(1);
+
+    Map<String, String> extraParams = ImmutableMap.of("someKey", "someValue");
+    jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": true}", 
extraParams));
+    jettyServer.start();
+
+    // expect that the fetchTask in the work is called, after sla check passes
+    
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+        .andReturn(Optional.of(task1));
+
+    control.replay();
+
+    while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+      slaManager.checkSlaThenAct(
+          task1,
+          createCoordinatorSlaPolicy(),
+          storeProvider -> {
+            // set the marker to indicate that we performed the work
+            workCalled.countDown();
+            storeProvider
+                .getUnsafeTaskStore()
+                .fetchTask(task1.getAssignedTask().getTaskId());
+            return null;
+          },
+          extraParams,
           false);
     }
 
@@ -767,6 +834,7 @@ public class SlaManagerTest extends EasyMockTest {
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -804,6 +872,7 @@ public class SlaManagerTest extends EasyMockTest {
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -847,6 +916,7 @@ public class SlaManagerTest extends EasyMockTest {
               .fetchTask(task1.getAssignedTask().getTaskId());
           return null;
         },
+        ImmutableMap.of(),
         true);
 
     workCalled.await();
@@ -884,6 +954,7 @@ public class SlaManagerTest extends EasyMockTest {
                 .fetchTask(task1.getAssignedTask().getTaskId());
             return null;
           },
+          ImmutableMap.of(),
           false);
     }
 
@@ -925,6 +996,7 @@ public class SlaManagerTest extends EasyMockTest {
               .fetchTask(task1.getAssignedTask().getTaskId());
           return null;
         },
+        ImmutableMap.of(),
         false);
 
     try {
@@ -987,6 +1059,7 @@ public class SlaManagerTest extends EasyMockTest {
                 LOG.info("Finished action1 for task:{}", 
slaManager.getTaskKey(task1));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1019,6 +1092,7 @@ public class SlaManagerTest extends EasyMockTest {
                 LOG.info("Finished action2 for task:{}", 
slaManager.getTaskKey(task2));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1104,6 +1178,7 @@ public class SlaManagerTest extends EasyMockTest {
                 LOG.info("Finished action for task:{}", 
slaManager.getTaskKey(task1));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1137,6 +1212,7 @@ public class SlaManagerTest extends EasyMockTest {
                 LOG.info("Finished action for task:{}", 
slaManager.getTaskKey(task2));
                 return null;
               },
+              ImmutableMap.of(),
               false);
         }
       } catch (InterruptedException e) {
@@ -1178,6 +1254,13 @@ public class SlaManagerTest extends EasyMockTest {
   private AbstractHandler mockCoordinatorResponse(
       IScheduledTask task,
       String pollResponse) {
+    return mockCoordinatorResponse(task, pollResponse, ImmutableMap.of());
+  }
+
+  private AbstractHandler mockCoordinatorResponse(
+      IScheduledTask task,
+      String pollResponse,
+      Map<String, String> params) {
 
     return new AbstractHandler() {
       @Override
@@ -1191,8 +1274,15 @@ public class SlaManagerTest extends EasyMockTest {
           String query = Joiner
               .on("=")
               .join(SlaManager.TASK_PARAM, URLEncoder.encode(taskKey, 
"UTF-8"));
-          String body = new TSerializer(new TSimpleJSONProtocol.Factory())
+
+          String taskConfig = new TSerializer(new 
TSimpleJSONProtocol.Factory())
               .toString(task.newBuilder());
+          JsonObject jsonBody = new JsonObject();
+          jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+          jsonBody.addProperty(SlaManager.TASK_PARAM, taskKey);
+          params.forEach(jsonBody::addProperty);
+          String body = new Gson().toJson(jsonBody);
+
           if (request.getQueryString().equals(query)
               && 
request.getReader().lines().collect(Collectors.joining()).equals(body)) {
             createResponse(baseRequest, response, pollResponse);

Reply via email to