Repository: aurora
Updated Branches:
refs/heads/master 31404487d -> 8508aec36
Introduce a `countdown-ms` param in Coordinator request.
With the introduction of `timeoutSecs` for HostMaintenanceRequest
and the `CoordinatorSlaPolicy`, it will be beneficial to expose the
time remaining until forced maintenance to the Coordinator. Send
the time remaining until force task maintenance as an extra query
param to the Coordinator.
Testing Done:
./gradlew test
./build-support/jenkins/build.sh
**Tested on Vagrant**
***Logs from Coordinator***
Request received for {'task': ['devcluster/vagrant/test/coordinator/0']}
{
"forceMaintenanceCountdownMs": "604755646",
"task": "devcluster/vagrant/test/coordinator/0",
"taskConfig": {
"assignedTask": {
"assignedPorts": {},
"instanceId": 0,
"slaveHost": "192.168.33.7",
"slaveId": "f0336813-864b-4c8f-914c-80f8cef3b61d-S0",
"task": {
...<SNIPPED>
}
Responded: True
Reviewed at https://reviews.apache.org/r/67657/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8508aec3
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8508aec3
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8508aec3
Branch: refs/heads/master
Commit: 8508aec36b8ddd3d8594d3d57d27b4e4ae7faea7
Parents: 3140448
Author: Santhosh Kumar Shanmugham <[email protected]>
Authored: Wed Jun 20 17:27:48 2018 -0700
Committer: Santhosh Kumar <[email protected]>
Committed: Wed Jun 20 17:27:48 2018 -0700
----------------------------------------------------------------------
docs/features/sla-requirements.md | 36 ++++----
.../maintenance/MaintenanceController.java | 30 ++++---
.../apache/aurora/scheduler/sla/SlaManager.java | 25 ++++--
.../MaintenanceControllerImplTest.java | 5 ++
.../aurora/scheduler/sla/SlaManagerTest.java | 92 +++++++++++++++++++-
5 files changed, 155 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/docs/features/sla-requirements.md
----------------------------------------------------------------------
diff --git a/docs/features/sla-requirements.md
b/docs/features/sla-requirements.md
index 555b174..2b3fa65 100644
--- a/docs/features/sla-requirements.md
+++ b/docs/features/sla-requirements.md
@@ -123,8 +123,8 @@ specified for a job, any action that requires removing a
task
(such as drains) will be required to get approval from the `Coordinator`
before proceeding. The
coordinator service needs to expose a HTTP endpoint, that can take a
`task-key` param
(`<cluster>/<role>/<env>/<name>/<instance>`) and a json body describing the
task
-details and return a response json that will contain the boolean status for
allowing or disallowing
-the task's removal.
+details, force maintenance countdown (ms) and other params and return a
response json that will
+contain the boolean status for allowing or disallowing the task's removal.
##### Request:
```javascript
@@ -132,24 +132,28 @@ POST /
?task=<cluster>/<role>/<env>/<name>/<instance>
{
- "assignedTask": {
- "taskId": "taskA",
- "slaveHost": "a",
- "task": {
- "job": {
- "role": "role",
- "environment": "devel",
- "name": "job"
+ "forceMaintenanceCountdownMs": "604755646",
+ "task": "cluster/role/devel/job/1",
+ "taskConfig": {
+ "assignedTask": {
+ "taskId": "taskA",
+ "slaveHost": "a",
+ "task": {
+ "job": {
+ "role": "role",
+ "environment": "devel",
+ "name": "job"
+ },
+ ...
},
+ "assignedPorts": {
+ "http": 1000
+ },
+ "instanceId": 1
...
},
- "assignedPorts": {
- "http": 1000
- },
- "instanceId": 1
...
- },
- ...
+ }
}
```
http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
index 626a682..344a430 100644
---
a/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
+++
b/src/main/java/org/apache/aurora/scheduler/maintenance/MaintenanceController.java
@@ -33,6 +33,7 @@ import com.google.common.base.Predicates;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -177,6 +178,8 @@ public interface MaintenanceController {
@VisibleForTesting
static final String DRAINING_MESSAGE = "Draining machine for maintenance.";
+ private static final String COUNTDOWN_MS_PARAM =
"forceMaintenanceCountdownMs";
+
private static final String MAINTENANCE_COUNTDOWN_STAT_NAME =
"maintenance_countdown_ms";
private static final String MISSING_MAINTENANCE_REQUEST =
"missing_maintenance_request";
private static final SlaPolicy ZERO_PERCENT_SLA =
SlaPolicy.percentageSlaPolicy(
@@ -465,19 +468,23 @@ public interface MaintenanceController {
}
boolean force = false;
- long expireMs =
- System.currentTimeMillis() -
hostMaintenanceRequest.get().getCreatedTimestampMs();
- long maintenanceCountDownMs =
- TimeAmount.of(hostMaintenanceRequest.get().getTimeoutSecs(),
Time.SECONDS)
- .as(Time.MILLISECONDS) - expireMs;
+ long startMs = hostMaintenanceRequest.get().getCreatedTimestampMs();
+ long timeoutMs = TimeAmount.of(
+ hostMaintenanceRequest.get().getTimeoutSecs(),
+ Time.SECONDS)
+ .as(Time.MILLISECONDS);
+ long endMs = startMs + timeoutMs;
+ long remainingMs = endMs - System.currentTimeMillis();
maintenanceCountDownByTask.get(
Joiner.on("_")
- .join(MAINTENANCE_COUNTDOWN_STAT_NAME,
- InstanceKeys.toString(Tasks.getJob(task),
Tasks.getInstanceId(task))))
- .getAndSet(maintenanceCountDownMs);
-
- if (hostMaintenanceRequest.get().getTimeoutSecs()
- < TimeAmount.of(expireMs, Time.MILLISECONDS).as(Time.SECONDS)) {
+ .join(
+ MAINTENANCE_COUNTDOWN_STAT_NAME,
+ InstanceKeys.toString(Tasks.getJob(task),
Tasks.getInstanceId(task))
+ )
+ )
+ .getAndSet(remainingMs);
+
+ if (remainingMs < 0) {
LOG.warn("Maintenance request timed out for host: {} after {} secs.
Forcing drain of {}.",
host, hostMaintenanceRequest.get().getTimeoutSecs(),
Tasks.id(task));
force = true;
@@ -496,6 +503,7 @@ public interface MaintenanceController {
Optional.empty(),
ScheduleStatus.DRAINING,
Optional.of(DRAINING_MESSAGE)),
+ ImmutableMap.of(COUNTDOWN_MS_PARAM, Long.toString(remainingMs)),
force);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
index 9c5caf4..35ca771 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaManager.java
@@ -33,6 +33,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Striped;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import com.google.inject.Inject;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
@@ -266,13 +268,15 @@ public class SlaManager extends AbstractIdleService {
* @param task Task whose SLA is to checked.
* @param slaPolicy {@link ICoordinatorSlaPolicy} to use for checking SLA.
* @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+ * @param params dictionary of key-value pairs to send in json body to
Coordinator
* @param <T> The type of result the {@link Storage.MutateWork} produces.
* @param <E> The type of exception the {@link Storage.MutateWork} throw.
*/
private <T, E extends Exception> void askCoordinatorThenAct(
IScheduledTask task,
ICoordinatorSlaPolicy slaPolicy,
- Storage.MutateWork<T, E> work) {
+ Storage.MutateWork<T, E> work,
+ Map<String, String> params) {
String taskKey = getTaskKey(task);
@@ -287,7 +291,7 @@ public class SlaManager extends AbstractIdleService {
taskKey);
attemptsCounter.incrementAndGet();
- if (coordinatorAllows(task, taskKey, slaPolicy)) {
+ if (coordinatorAllows(task, taskKey, slaPolicy, params)) {
LOG.info("Performing work after coordinator: {} approval for task:
{}",
slaPolicy.getCoordinatorUrl(),
taskKey);
@@ -322,14 +326,22 @@ public class SlaManager extends AbstractIdleService {
private boolean coordinatorAllows(
IScheduledTask task,
String taskKey,
- ICoordinatorSlaPolicy slaPolicy)
+ ICoordinatorSlaPolicy slaPolicy,
+ Map<String, String> params)
throws InterruptedException, ExecutionException, TException {
LOG.info("Checking coordinator: {} for task: {}",
slaPolicy.getCoordinatorUrl(), taskKey);
+ String taskConfig = new TSerializer(new TSimpleJSONProtocol.Factory())
+ .toString(task.newBuilder());
+ JsonObject jsonBody = new JsonObject();
+ jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+ jsonBody.addProperty(TASK_PARAM, taskKey);
+ params.forEach(jsonBody::addProperty);
+
Response response = httpClient.preparePost(slaPolicy.getCoordinatorUrl())
.setQueryParams(ImmutableList.of(new Param(TASK_PARAM, taskKey)))
- .setBody(new TSerializer(new
TSimpleJSONProtocol.Factory()).toString(task.newBuilder()))
+ .setBody(new Gson().toJson(jsonBody))
.execute()
.get();
@@ -391,6 +403,7 @@ public class SlaManager extends AbstractIdleService {
* @param task Task whose SLA is to be checked.
* @param slaPolicy {@link ISlaPolicy} to use.
* @param work {@link Storage.MutateWork} to perform, if SLA is satisfied.
+ * @param params dictionary of key-value pairs to send in json body to
Coordinator
* @param force boolean to indicate if work should be performed without
checking SLA.
* @param <T> The type of result the {@link Storage.MutateWork} produces.
* @param <E> The type of exception the {@link Storage.MutateWork} throw.
@@ -401,6 +414,7 @@ public class SlaManager extends AbstractIdleService {
IScheduledTask task,
ISlaPolicy slaPolicy,
Storage.MutateWork<T, E> work,
+ Map<String, String> params,
boolean force) throws E {
if (force) {
@@ -417,7 +431,8 @@ public class SlaManager extends AbstractIdleService {
executor.execute(() -> askCoordinatorThenAct(
task,
slaPolicy.getCoordinatorSlaPolicy(),
- work));
+ work,
+ params));
} else {
// verify sla and perform work if satisfied
storage.write(store -> {
http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git
a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
index c9390df..1a62f5a 100644
---
a/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
+++
b/src/test/java/org/apache/aurora/scheduler/maintenance/MaintenanceControllerImplTest.java
@@ -17,6 +17,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
@@ -174,6 +175,7 @@ public class MaintenanceControllerImplTest extends
EasyMockTest {
IHostMaintenanceRequest.build(new HostMaintenanceRequest()
.setHost(HOST_A)
.setCreatedTimestampMs(System.currentTimeMillis())
+ .setTimeoutSecs(7200)
.setDefaultSlaPolicy(SLA_POLICY));
storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest(
@@ -294,6 +296,7 @@ public class MaintenanceControllerImplTest extends
EasyMockTest {
IHostMaintenanceRequest.build(new HostMaintenanceRequest()
.setHost(HOST_A)
.setCreatedTimestampMs(System.currentTimeMillis())
+ .setTimeoutSecs(7200)
.setDefaultSlaPolicy(SLA_POLICY));
expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
.andReturn(Optional.of(maintenanceRequest)).times(2);
@@ -385,6 +388,7 @@ public class MaintenanceControllerImplTest extends
EasyMockTest {
IHostMaintenanceRequest.build(new HostMaintenanceRequest()
.setHost(HOST_A)
.setCreatedTimestampMs(System.currentTimeMillis())
+ .setTimeoutSecs(7200)
.setDefaultSlaPolicy(SLA_POLICY));
expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A))
.andReturn(Optional.of(maintenanceRequest)).times(1);
@@ -402,6 +406,7 @@ public class MaintenanceControllerImplTest extends
EasyMockTest {
eq(task),
eq(ISlaPolicy.build(SLA_POLICY)),
anyObject(Storage.MutateWork.class),
+ anyObject(ImmutableMap.class),
eq(force));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/8508aec3/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
index 759a1bc..6881678 100644
--- a/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaManagerTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.sla;
import java.io.IOException;
import java.net.URLEncoder;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +26,11 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -207,6 +212,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -245,6 +251,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -279,6 +286,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -310,6 +318,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -341,6 +350,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -372,6 +382,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -403,6 +414,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -427,6 +439,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
true);
}
@@ -462,6 +475,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -503,6 +517,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -540,6 +555,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -571,6 +587,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -602,6 +619,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -633,6 +651,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -665,6 +684,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
false);
}
@@ -690,6 +710,7 @@ public class SlaManagerTest extends EasyMockTest {
storeProvider -> storeProvider
.getUnsafeTaskStore()
.fetchTask(task1.getAssignedTask().getTaskId()),
+ ImmutableMap.of(),
true);
}
@@ -724,6 +745,52 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
+ false);
+ }
+
+ // wait until we are sure that the server has responded
+ coordinatorResponded.await();
+ workCalled.await();
+
+ assertEquals(0, coordinatorResponded.getCount());
+ // check the work was called
+ assertEquals(0, workCalled.getCount());
+ }
+
+ /**
+ * Verifies that SLA check passes and the supplied {@link
Storage.MutateWork} gets executed
+ * for a job when {@link CoordinatorSlaPolicy#coordinatorUrl} responds
+ * with {@code {"drain": true}} when supplied with extra params.
+ */
+ @Test
+ public void testCheckCoordinatorWithExtraParamsSlaPassesThenActs() throws
Exception {
+ IScheduledTask task1 = makeTask("taskA", 1, RUNNING);
+ CountDownLatch workCalled = new CountDownLatch(1);
+
+ Map<String, String> extraParams = ImmutableMap.of("someKey", "someValue");
+ jettyServer.setHandler(mockCoordinatorResponse(task1, "{\"drain\": true}",
extraParams));
+ jettyServer.start();
+
+ // expect that the fetchTask in the work is called, after sla check passes
+
expect(storageUtil.taskStore.fetchTask(task1.getAssignedTask().getTaskId()))
+ .andReturn(Optional.of(task1));
+
+ control.replay();
+
+ while (!coordinatorResponded.await(100, TimeUnit.MILLISECONDS)) {
+ slaManager.checkSlaThenAct(
+ task1,
+ createCoordinatorSlaPolicy(),
+ storeProvider -> {
+ // set the marker to indicate that we performed the work
+ workCalled.countDown();
+ storeProvider
+ .getUnsafeTaskStore()
+ .fetchTask(task1.getAssignedTask().getTaskId());
+ return null;
+ },
+ extraParams,
false);
}
@@ -767,6 +834,7 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
false);
}
@@ -804,6 +872,7 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
false);
}
@@ -847,6 +916,7 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
true);
workCalled.await();
@@ -884,6 +954,7 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
false);
}
@@ -925,6 +996,7 @@ public class SlaManagerTest extends EasyMockTest {
.fetchTask(task1.getAssignedTask().getTaskId());
return null;
},
+ ImmutableMap.of(),
false);
try {
@@ -987,6 +1059,7 @@ public class SlaManagerTest extends EasyMockTest {
LOG.info("Finished action1 for task:{}",
slaManager.getTaskKey(task1));
return null;
},
+ ImmutableMap.of(),
false);
}
} catch (InterruptedException e) {
@@ -1019,6 +1092,7 @@ public class SlaManagerTest extends EasyMockTest {
LOG.info("Finished action2 for task:{}",
slaManager.getTaskKey(task2));
return null;
},
+ ImmutableMap.of(),
false);
}
} catch (InterruptedException e) {
@@ -1104,6 +1178,7 @@ public class SlaManagerTest extends EasyMockTest {
LOG.info("Finished action for task:{}",
slaManager.getTaskKey(task1));
return null;
},
+ ImmutableMap.of(),
false);
}
} catch (InterruptedException e) {
@@ -1137,6 +1212,7 @@ public class SlaManagerTest extends EasyMockTest {
LOG.info("Finished action for task:{}",
slaManager.getTaskKey(task2));
return null;
},
+ ImmutableMap.of(),
false);
}
} catch (InterruptedException e) {
@@ -1178,6 +1254,13 @@ public class SlaManagerTest extends EasyMockTest {
private AbstractHandler mockCoordinatorResponse(
IScheduledTask task,
String pollResponse) {
+ return mockCoordinatorResponse(task, pollResponse, ImmutableMap.of());
+ }
+
+ private AbstractHandler mockCoordinatorResponse(
+ IScheduledTask task,
+ String pollResponse,
+ Map<String, String> params) {
return new AbstractHandler() {
@Override
@@ -1191,8 +1274,15 @@ public class SlaManagerTest extends EasyMockTest {
String query = Joiner
.on("=")
.join(SlaManager.TASK_PARAM, URLEncoder.encode(taskKey,
"UTF-8"));
- String body = new TSerializer(new TSimpleJSONProtocol.Factory())
+
+ String taskConfig = new TSerializer(new
TSimpleJSONProtocol.Factory())
.toString(task.newBuilder());
+ JsonObject jsonBody = new JsonObject();
+ jsonBody.add("taskConfig", new JsonParser().parse(taskConfig));
+ jsonBody.addProperty(SlaManager.TASK_PARAM, taskKey);
+ params.forEach(jsonBody::addProperty);
+ String body = new Gson().toJson(jsonBody);
+
if (request.getQueryString().equals(query)
&&
request.getReader().lines().collect(Collectors.joining()).equals(body)) {
createResponse(baseRequest, response, pollResponse);