zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r344583750
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##########
 @@ -18,166 +18,218 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-       /** Tests simple statistics with fake stack traces. */
        @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
-
-               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-               Mockito.when(sampleCoordinator.triggerStackTraceSample(
-                               Matchers.any(ExecutionVertex[].class),
-                               Matchers.anyInt(),
-                               Matchers.any(Time.class),
-                               Matchers.anyInt())).thenReturn(sampleFuture);
-
-               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-               // Same Thread execution context
-               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-                       @Override
-                       public void execute(Runnable runnable) {
-                               runnable.run();
-                       }
-               });
-
-               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-               int numSamples = 100;
-               Time delayBetweenSamples = Time.milliseconds(100L);
-
-               BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-                               sampleCoordinator, 9999, numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-               // getOperatorBackPressureStats triggers stack trace sampling
-               
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-               // Request back pressure stats again. This should not trigger 
another sample request
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               // Complete the future
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
-               for (ExecutionVertex vertex : taskVertices) {
-                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
+       public void testGetOperatorBackPressureStats() throws Exception {
+               final ExecutionJobVertex executionJobVertex = 
BackPressureTrackerTestUtils.createExecutionJobVertex();
+               final ExecutionVertex[] taskVertices = 
executionJobVertex.getTaskVertices();
 
-                       for (int i = 0; i < taskVertices.length; i++) {
-                               // Traces until sub task index are back 
pressured
-                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
-                       }
+               final int requestId = 0;
+               final long startTime = System.currentTimeMillis();
+               final long endTime = startTime + 1;
+               final double backPressureRatio = 0.1;
 
-                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
-               }
+               final BackPressureStats backPressureStats = 
createBackPressureStats(
+                       taskVertices, requestId, startTime, endTime, 
backPressureRatio);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(600000, 10000, backPressureStats);
 
-               int sampleId = 1231;
-               int endTime = 841;
+               // trigger back pressure stats request
+               tracker.getOperatorBackPressureStats(executionJobVertex);
 
-               StackTraceSample sample = new StackTraceSample(
-                               sampleId,
-                               0,
-                               endTime,
-                               traces);
+               Optional<OperatorBackPressureStats> optionalStats = 
tracker.getOperatorBackPressureStats(executionJobVertex);
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
 
-               // Succeed the promise
-               sampleFuture.complete(sample);
+               checkOperatorBackPressureStats(taskVertices, requestId, 
endTime, backPressureRatio, stats);
+       }
 
-               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+       @Test
+       public void testOperatorBackPressureStatsUpdate() throws Exception {
+               final ExecutionJobVertex jobVertex = 
BackPressureTrackerTestUtils.createExecutionJobVertex();
+               final int backPressureStatsRefreshInterval = 2000;
+               final long waitingTime = backPressureStatsRefreshInterval + 500;
+
+               final int requestId1 = 0;
+               final long startTime1 = System.currentTimeMillis();
+               final long endTime1 = startTime1 + 1;
+               final double backPressureRatio1 = 0.1;
+
+               final int requestId2 = 1;
+               final long startTime2 = System.currentTimeMillis() + 
waitingTime;
+               final long endTime2 = startTime2 + 1;
+               final double backPressureRatio2 = 0.2;
+
+               final BackPressureStats backPressureStats1 = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), requestId1, startTime1, 
endTime1, backPressureRatio1);
+               final BackPressureStats backPressureStats2 = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), requestId2, startTime2, 
endTime2, backPressureRatio2);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       600000, backPressureStatsRefreshInterval, 
backPressureStats1, backPressureStats2);
+
+               // trigger back pressure stats request
+               
assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               Optional<OperatorBackPressureStats> optionalStats = 
tracker.getOperatorBackPressureStats(jobVertex);
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
+
+               final int requestId = stats.getRequestId();
+               checkOperatorBackPressureStats(jobVertex.getTaskVertices(), 
requestId1, endTime1, backPressureRatio1, stats);
+
+               // should not trigger new back pressure stats request
+               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               assertEquals(requestId, 
tracker.getOperatorBackPressureStats(jobVertex).get().getRequestId());
+
+               // ensure that we are ready for next request
+               Thread.sleep(waitingTime);
+
+               // trigger next back pressure stats request
+               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               OperatorBackPressureStats newStats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+               assertNotEquals(requestId, newStats.getRequestId());
 
 Review comment:
   Does it exist some unstable issue here that the previous future has not 
completed by another thread when we get `newStats` here?
   It seems better to use `assertEquals(requestId2, newStats.getRequestId())` 
instead.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to