wsry commented on a change in pull request #10083:
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r345219318
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
##########
@@ -555,138 +555,88 @@ public void testFailingScheduleOrUpdateConsumers()
throws Exception {
}
//
------------------------------------------------------------------------
- // Stack trace sample
+ // Back pressure request
//
------------------------------------------------------------------------
/**
- * Tests sampling of task stack traces.
+ * Tests request of task back pressure.
*/
- @Test(timeout = 10000L)
- @SuppressWarnings("unchecked")
- public void testRequestStackTraceSample() throws Exception {
- final ExecutionAttemptID eid = new ExecutionAttemptID();
- final TaskDeploymentDescriptor tdd =
createTestTaskDeploymentDescriptor("test task", eid,
BlockingNoOpInvokable.class);
-
- final int sampleId1 = 112223;
- final int sampleId2 = 19230;
- final int sampleId3 = 1337;
- final int sampleId4 = 44;
+ @Test(timeout = 20000L)
+ public void testRequestTaskBackPressure() throws Exception {
+ final NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(
+ new IntermediateResultPartitionID(),
ResourceID.generate());
+ final TaskDeploymentDescriptor tdd =
createSender(shuffleDescriptor, OutputBlockedInvokable.class);
+ final ExecutionAttemptID executionAttemptID =
tdd.getExecutionAttemptId();
final CompletableFuture<Void> taskRunningFuture = new
CompletableFuture<>();
final CompletableFuture<Void> taskCanceledFuture = new
CompletableFuture<>();
- try (TaskSubmissionTestEnvironment env =
- new TaskSubmissionTestEnvironment.Builder(jobId)
- .setSlotSize(1)
- .addTaskManagerActionListener(eid,
ExecutionState.RUNNING, taskRunningFuture)
- .addTaskManagerActionListener(eid,
ExecutionState.CANCELED, taskCanceledFuture)
- .build()) {
- TaskExecutorGateway tmGateway =
env.getTaskExecutorGateway();
- TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+ final Configuration configuration = new Configuration();
+ configuration.set(WebOptions.BACKPRESSURE_NUM_SAMPLES, 10);
+ configuration.set(WebOptions.BACKPRESSURE_DELAY, 200);
+
+ try (final TaskSubmissionTestEnvironment env = new
TaskSubmissionTestEnvironment.Builder(jobId)
+ .setSlotSize(1)
+ .setConfiguration(configuration)
+ .useRealNonMockShuffleEnvironment()
+
.addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING,
taskRunningFuture)
+
.addTaskManagerActionListener(executionAttemptID, ExecutionState.CANCELED,
taskCanceledFuture)
+ .build()) {
+ final TaskExecutorGateway tmGateway =
env.getTaskExecutorGateway();
+ final TaskSlotTable taskSlotTable =
env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId,
tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(),
timeout).get();
taskRunningFuture.get();
- //
- // 1) Trigger sample for non-existing task
- //
- ExecutionAttemptID nonExistTaskEid = new
ExecutionAttemptID();
+ // 1) trigger request for non-existing task.
+ final int requestId = 1234;
+ final ExecutionAttemptID nonExistTaskEid = new
ExecutionAttemptID();
- CompletableFuture<StackTraceSampleResponse>
failedSampleFuture =
-
tmGateway.requestStackTraceSample(nonExistTaskEid, sampleId1, 100,
Time.seconds(60L), 0, timeout);
+ final CompletableFuture<TaskBackPressureResponse>
failedRequestFuture =
+
tmGateway.requestTaskBackPressure(nonExistTaskEid, requestId, timeout);
try {
- failedSampleFuture.get();
+ failedRequestFuture.get();
} catch (Exception e) {
assertThat(e.getCause(),
instanceOf(IllegalStateException.class));
- assertThat(e.getCause().getMessage(),
startsWith("Cannot sample task"));
+ assertThat(e.getCause().getMessage(),
startsWith("Cannot request back pressure"));
}
- //
- // 2) Trigger sample for the blocking task
- //
- int numSamples = 5;
-
- CompletableFuture<StackTraceSampleResponse>
successfulSampleFuture =
- tmGateway.requestStackTraceSample(eid,
sampleId2, numSamples, Time.milliseconds(100L), 0, timeout);
+ // 2) trigger request for the blocking task.
+ double backPressureRatio = 0;
- StackTraceSampleResponse response =
successfulSampleFuture.get();
+ for (int i = 0; i < 5; ++i) {
+ CompletableFuture<TaskBackPressureResponse>
successfulRequestFuture =
+
tmGateway.requestTaskBackPressure(executionAttemptID, i, timeout);
- assertEquals(response.getSampleId(), sampleId2);
- assertEquals(response.getExecutionAttemptID(), eid);
+ TaskBackPressureResponse response =
successfulRequestFuture.get();
- List<StackTraceElement[]> traces =
response.getSamples();
+ assertEquals(response.getRequestId(), i);
+ assertEquals(response.getExecutionAttemptID(),
executionAttemptID);
- assertEquals("Number of samples", numSamples,
traces.size());
-
- for (StackTraceElement[] trace : traces) {
- boolean success = false;
- for (StackTraceElement elem : trace) {
- // Look for BlockingNoOpInvokable#invoke
- if (elem.getClassName().equals(
-
BlockingNoOpInvokable.class.getName())) {
-
- assertEquals("invoke",
elem.getMethodName());
-
- success = true;
- break;
- }
- // The BlockingNoOpInvokable might not
be invoked here
- if
(elem.getClassName().equals(TestTaskManagerActions.class.getName())) {
-
-
assertEquals("updateTaskExecutionState", elem.getMethodName());
-
- success = true;
- break;
- }
- if
(elem.getClassName().equals(Thread.class) &&
elem.getMethodName().equals("setContextClassLoader")) {
- success = true;
- }
+ if ((backPressureRatio =
response.getBackPressureRatio()) >= 1.0) {
+ break;
}
-
- assertTrue("Unexpected stack trace: " +
- Arrays.toString(trace), success);
}
- //
- // 3) Trigger sample for the blocking task with max
depth
- //
- int maxDepth = 2;
-
- CompletableFuture<StackTraceSampleResponse>
successfulSampleFutureWithMaxDepth =
- tmGateway.requestStackTraceSample(eid,
sampleId3, numSamples, Time.milliseconds(100L), maxDepth, timeout);
-
- StackTraceSampleResponse responseWithMaxDepth =
successfulSampleFutureWithMaxDepth.get();
-
- assertEquals(sampleId3,
responseWithMaxDepth.getSampleId());
- assertEquals(eid,
responseWithMaxDepth.getExecutionAttemptID());
-
- List<StackTraceElement[]> tracesWithMaxDepth =
responseWithMaxDepth.getSamples();
-
- assertEquals("Number of samples", numSamples,
tracesWithMaxDepth.size());
-
- for (StackTraceElement[] trace : tracesWithMaxDepth) {
- assertEquals("Max depth", maxDepth,
trace.length);
- }
+ assertEquals("Task was not back pressured in given
time.", 1.0, backPressureRatio, 0.0);
- //
- // 4) Trigger sample for the blocking task, but cancel
it during sampling
- //
- int sleepTime = 100;
- numSamples = 100;
+ // 3) trigger request for the blocking task, but cancel
it before request finishes.
+ final int sleepTime = 1000;
- CompletableFuture<StackTraceSampleResponse>
canceldSampleFuture =
- tmGateway.requestStackTraceSample(eid,
sampleId4, numSamples, Time.milliseconds(10L), maxDepth, timeout);
+ CompletableFuture<TaskBackPressureResponse>
canceledRequestFuture =
+
tmGateway.requestTaskBackPressure(executionAttemptID, requestId, timeout);
Thread.sleep(sleepTime);
Review comment:
The sleep tries to ensure that the sample has started before canceling. I
guess 1s is enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services