tillrohrmann commented on a change in pull request #7356:
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258508761
##########
File path:
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
##########
@@ -708,6 +719,47 @@ public void testWorkerFailed() throws Exception {
}};
}
+ @Test
+ public void testWorkerFailedAtFailureRate() throws Exception {
+ new Context() {{
+ // set the initial persistent state with a launched
worker
+ MesosWorkerStore.Worker worker1launched =
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+ MesosWorkerStore.Worker worker2launched =
MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+
+
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+
when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1launched,
worker2launched));
+
when(rmServices.workerStore.newTaskID()).thenReturn(task3);
+ startResourceManager();
+
+ // tell the RM that a tasks failed
+
when(rmServices.workerStore.removeWorker(task1)).thenReturn(true);
+
when(rmServices.workerStore.removeWorker(task2)).thenReturn(true);
+ resourceManager.taskTerminated(new
TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
+
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+ // tell the RM that a task failed
+ resourceManager.taskTerminated(new
TaskMonitor.TaskTerminated(task2, Protos.TaskStatus.newBuilder()
+
.setTaskId(task2).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+ verify(rmServices.workerStore).removeWorker(task1);
+ verify(rmServices.workerStore).removeWorker(task2);
+ assertThat(resourceManager.workersInLaunch.entrySet(),
empty());
+
assertThat(resourceManager.workersBeingReturned.entrySet(), empty());
+ assertThat(resourceManager.workersInNew,
hasKey(extractResourceID(task3)));
+
+ // request second slot
+ CompletableFuture<?> registerSlotRequestFuture =
resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new
AllocationID(), resourceProfile1, slave1host));
+ return null;
+ });
+
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+ assertEquals(0,
rmServices.slotManager.getNumberPendingSlotRequest());
+ }};
Review comment:
I think we can improve this test by only testing that
`resourceManager.taskTerminated` increases the `failureRater` and then having a
dedicated test for the `ResourceManager` which makes sure that you won't start
new workers if `failureRater` is exceeded. Then we would not have to mock all
this behaviour which is super brittle.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services