wsry commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r345219318
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##########
 @@ -555,138 +555,88 @@ public void testFailingScheduleOrUpdateConsumers() 
throws Exception {
        }
 
        // 
------------------------------------------------------------------------
-       // Stack trace sample
+       // Back pressure request
        // 
------------------------------------------------------------------------
 
        /**
-        * Tests sampling of task stack traces.
+        * Tests request of task back pressure.
         */
-       @Test(timeout = 10000L)
-       @SuppressWarnings("unchecked")
-       public void testRequestStackTraceSample() throws Exception {
-               final ExecutionAttemptID eid = new ExecutionAttemptID();
-               final TaskDeploymentDescriptor tdd = 
createTestTaskDeploymentDescriptor("test task", eid, 
BlockingNoOpInvokable.class);
-
-               final int sampleId1 = 112223;
-               final int sampleId2 = 19230;
-               final int sampleId3 = 1337;
-               final int sampleId4 = 44;
+       @Test(timeout = 20000L)
+       public void testRequestTaskBackPressure() throws Exception {
+               final NettyShuffleDescriptor shuffleDescriptor = 
createRemoteWithIdAndLocation(
+                       new IntermediateResultPartitionID(), 
ResourceID.generate());
+               final TaskDeploymentDescriptor tdd = 
createSender(shuffleDescriptor, OutputBlockedInvokable.class);
+               final ExecutionAttemptID executionAttemptID = 
tdd.getExecutionAttemptId();
 
                final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
                final CompletableFuture<Void> taskCanceledFuture = new 
CompletableFuture<>();
 
-               try (TaskSubmissionTestEnvironment env =
-                       new TaskSubmissionTestEnvironment.Builder(jobId)
-                               .setSlotSize(1)
-                               .addTaskManagerActionListener(eid, 
ExecutionState.RUNNING, taskRunningFuture)
-                               .addTaskManagerActionListener(eid, 
ExecutionState.CANCELED, taskCanceledFuture)
-                               .build()) {
-                       TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
-                       TaskSlotTable taskSlotTable = env.getTaskSlotTable();
+               final Configuration configuration = new Configuration();
+               configuration.set(WebOptions.BACKPRESSURE_NUM_SAMPLES, 10);
+               configuration.set(WebOptions.BACKPRESSURE_DELAY, 200);
+
+               try (final TaskSubmissionTestEnvironment env = new 
TaskSubmissionTestEnvironment.Builder(jobId)
+                                       .setSlotSize(1)
+                                       .setConfiguration(configuration)
+                                       .useRealNonMockShuffleEnvironment()
+                                       
.addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, 
taskRunningFuture)
+                                       
.addTaskManagerActionListener(executionAttemptID, ExecutionState.CANCELED, 
taskCanceledFuture)
+                                       .build()) {
+                       final TaskExecutorGateway tmGateway = 
env.getTaskExecutorGateway();
+                       final TaskSlotTable taskSlotTable = 
env.getTaskSlotTable();
 
                        taskSlotTable.allocateSlot(0, jobId, 
tdd.getAllocationId(), Time.seconds(60));
                        tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
                        taskRunningFuture.get();
 
-                       //
-                       // 1) Trigger sample for non-existing task
-                       //
-                       ExecutionAttemptID nonExistTaskEid = new 
ExecutionAttemptID();
+                       // 1) trigger request for non-existing task.
+                       final int requestId = 1234;
+                       final ExecutionAttemptID nonExistTaskEid = new 
ExecutionAttemptID();
 
-                       CompletableFuture<StackTraceSampleResponse> 
failedSampleFuture =
-                               
tmGateway.requestStackTraceSample(nonExistTaskEid, sampleId1, 100, 
Time.seconds(60L), 0, timeout);
+                       final CompletableFuture<TaskBackPressureResponse> 
failedRequestFuture =
+                               
tmGateway.requestTaskBackPressure(nonExistTaskEid, requestId, timeout);
                        try {
-                               failedSampleFuture.get();
+                               failedRequestFuture.get();
                        } catch (Exception e) {
                                assertThat(e.getCause(), 
instanceOf(IllegalStateException.class));
-                               assertThat(e.getCause().getMessage(), 
startsWith("Cannot sample task"));
+                               assertThat(e.getCause().getMessage(), 
startsWith("Cannot request back pressure"));
                        }
 
-                       //
-                       // 2) Trigger sample for the blocking task
-                       //
-                       int numSamples = 5;
-
-                       CompletableFuture<StackTraceSampleResponse> 
successfulSampleFuture =
-                               tmGateway.requestStackTraceSample(eid, 
sampleId2, numSamples, Time.milliseconds(100L), 0, timeout);
+                       // 2) trigger request for the blocking task.
+                       double backPressureRatio = 0;
 
-                       StackTraceSampleResponse response = 
successfulSampleFuture.get();
+                       for (int i = 0; i < 5; ++i) {
+                               CompletableFuture<TaskBackPressureResponse> 
successfulRequestFuture =
+                                       
tmGateway.requestTaskBackPressure(executionAttemptID, i, timeout);
 
-                       assertEquals(response.getSampleId(), sampleId2);
-                       assertEquals(response.getExecutionAttemptID(), eid);
+                               TaskBackPressureResponse response = 
successfulRequestFuture.get();
 
-                       List<StackTraceElement[]> traces = 
response.getSamples();
+                               assertEquals(response.getRequestId(), i);
+                               assertEquals(response.getExecutionAttemptID(), 
executionAttemptID);
 
-                       assertEquals("Number of samples", numSamples, 
traces.size());
-
-                       for (StackTraceElement[] trace : traces) {
-                               boolean success = false;
-                               for (StackTraceElement elem : trace) {
-                                       // Look for BlockingNoOpInvokable#invoke
-                                       if (elem.getClassName().equals(
-                                               
BlockingNoOpInvokable.class.getName())) {
-
-                                               assertEquals("invoke", 
elem.getMethodName());
-
-                                               success = true;
-                                               break;
-                                       }
-                                       // The BlockingNoOpInvokable might not 
be invoked here
-                                       if 
(elem.getClassName().equals(TestTaskManagerActions.class.getName())) {
-
-                                               
assertEquals("updateTaskExecutionState", elem.getMethodName());
-
-                                               success = true;
-                                               break;
-                                       }
-                                       if 
(elem.getClassName().equals(Thread.class) && 
elem.getMethodName().equals("setContextClassLoader")) {
-                                               success = true;
-                                       }
+                               if ((backPressureRatio = 
response.getBackPressureRatio()) >= 1.0) {
+                                       break;
                                }
-
-                               assertTrue("Unexpected stack trace: " +
-                                       Arrays.toString(trace), success);
                        }
 
-                       //
-                       // 3) Trigger sample for the blocking task with max 
depth
-                       //
-                       int maxDepth = 2;
-
-                       CompletableFuture<StackTraceSampleResponse> 
successfulSampleFutureWithMaxDepth =
-                               tmGateway.requestStackTraceSample(eid, 
sampleId3, numSamples, Time.milliseconds(100L), maxDepth, timeout);
-
-                       StackTraceSampleResponse responseWithMaxDepth = 
successfulSampleFutureWithMaxDepth.get();
-
-                       assertEquals(sampleId3, 
responseWithMaxDepth.getSampleId());
-                       assertEquals(eid, 
responseWithMaxDepth.getExecutionAttemptID());
-
-                       List<StackTraceElement[]> tracesWithMaxDepth = 
responseWithMaxDepth.getSamples();
-
-                       assertEquals("Number of samples", numSamples, 
tracesWithMaxDepth.size());
-
-                       for (StackTraceElement[] trace : tracesWithMaxDepth) {
-                               assertEquals("Max depth", maxDepth, 
trace.length);
-                       }
+                       assertEquals("Task was not back pressured in given 
time.", 1.0, backPressureRatio, 0.0);
 
-                       //
-                       // 4) Trigger sample for the blocking task, but cancel 
it during sampling
-                       //
-                       int sleepTime = 100;
-                       numSamples = 100;
+                       // 3) trigger request for the blocking task, but cancel 
it before request finishes.
+                       final int sleepTime = 1000;
 
-                       CompletableFuture<StackTraceSampleResponse> 
canceldSampleFuture =
-                               tmGateway.requestStackTraceSample(eid, 
sampleId4, numSamples, Time.milliseconds(10L), maxDepth, timeout);
+                       CompletableFuture<TaskBackPressureResponse> 
canceledRequestFuture =
+                               
tmGateway.requestTaskBackPressure(executionAttemptID, requestId, timeout);
 
                        Thread.sleep(sleepTime);
 
 Review comment:
   The sleep tries to ensure that the sample has started before canceling. I 
guess 1s is enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to