zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r343047978
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##########
 @@ -29,155 +30,247 @@
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
+import org.mockito.ArgumentMatchers;
+import org.mockito.stubbing.OngoingStubbing;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-       /** Tests simple statistics with fake stack traces. */
        @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
-
-               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-               Mockito.when(sampleCoordinator.triggerStackTraceSample(
-                               Matchers.any(ExecutionVertex[].class),
-                               Matchers.anyInt(),
-                               Matchers.any(Time.class),
-                               Matchers.anyInt())).thenReturn(sampleFuture);
+       public void testGetOperatorBackPressureStats() throws Exception {
+               final ExecutionJobVertex jobVertex = 
createExecutionJobVertex(JobStatus.RUNNING);
 
-               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
+               final int sampleId = 0;
+               final long startTime = System.currentTimeMillis();
+               final long endTime = startTime + 1;
+               final double backPressureRatio = 0.1;
 
-               // Same Thread execution context
-               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
+               final BackPressureStats backPressureStats = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), sampleId, startTime, 
endTime, backPressureRatio);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       600000, 10000, Time.milliseconds(50), 
backPressureStats);
 
-                       @Override
-                       public void execute(Runnable runnable) {
-                               runnable.run();
-                       }
-               });
+               // trigger back pressure stats sampling
+               tracker.getOperatorBackPressureStats(jobVertex);
 
-               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+               Optional<OperatorBackPressureStats> optionalStats = 
tracker.getOperatorBackPressureStats(jobVertex);
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
 
-               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+               checkOperatorBackPressureStats(jobVertex.getTaskVertices(), 
sampleId, endTime, backPressureRatio, stats);
+       }
 
-               int numSamples = 100;
-               Time delayBetweenSamples = Time.milliseconds(100L);
+       @Test
+       public void testOperatorBackPressureStatsUpdate() throws Exception {
+               final ExecutionJobVertex jobVertex = 
createExecutionJobVertex(JobStatus.RUNNING);
+               final int backPressureStatsRefreshInterval = 2000;
+               final long waitingTime = backPressureStatsRefreshInterval + 500;
+
+               final int sampleId1 = 0;
+               final long startTime1 = System.currentTimeMillis();
+               final long endTime1 = startTime1 + 1;
+               final double backPressureRatio1 = 0.1;
+
+               final int sampleId2 = 1;
+               final long startTime2 = System.currentTimeMillis() + 
waitingTime;
+               final long endTime2 = startTime2 + 1;
+               final double backPressureRatio2 = 0.2;
+
+               final BackPressureStats backPressureStats1 = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), sampleId1, startTime1, 
endTime1, backPressureRatio1);
+               final BackPressureStats backPressureStats2 = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), sampleId2, startTime2, 
endTime2, backPressureRatio2);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       600000, backPressureStatsRefreshInterval, 
Time.milliseconds(50), backPressureStats1, backPressureStats2);
+
+               // trigger back pressure stats sampling
+               
assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               Optional<OperatorBackPressureStats> optionalStats = 
tracker.getOperatorBackPressureStats(jobVertex);
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
+
+               final int sampleId = stats.getSampleId();
+               checkOperatorBackPressureStats(jobVertex.getTaskVertices(), 
sampleId1, endTime1, backPressureRatio1, stats);
+
+               // should not trigger new back pressure stats sampling
+               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               assertEquals(sampleId, 
tracker.getOperatorBackPressureStats(jobVertex).get().getSampleId());
+
+               // ensure that we are ready for next sampling
+               Thread.sleep(waitingTime);
+
+               // trigger next back pressure stats sampling
+               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               OperatorBackPressureStats newStats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+               assertNotEquals(sampleId, newStats.getSampleId());
+
+               checkOperatorBackPressureStats(jobVertex.getTaskVertices(), 
sampleId2, endTime2, backPressureRatio2, newStats);
+       }
 
-               BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-                               sampleCoordinator, 9999, numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
+       @Test
+       public void testGetOperatorBackPressureStatsAfterShutDown() throws 
Exception {
+               final ExecutionJobVertex jobVertex = 
createExecutionJobVertex(JobStatus.RUNNING);
 
-               // getOperatorBackPressureStats triggers stack trace sampling
-               
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               final int sampleId = 0;
+               final long startTime = System.currentTimeMillis();
+               final long endTime = startTime + 1;
+               final double backPressureRatio = 0.1;
 
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
+               final BackPressureStats backPressureStats = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), sampleId, startTime, 
endTime, backPressureRatio);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       600000, 10000, Time.milliseconds(50), 
backPressureStats);
 
-               // Request back pressure stats again. This should not trigger 
another sample request
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               tracker.shutDown();
 
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
+               // trigger back pressure stats sampling
+               tracker.getOperatorBackPressureStats(jobVertex);
 
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               
assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+       }
 
-               // Complete the future
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
-               for (ExecutionVertex vertex : taskVertices) {
-                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
+       @Test
+       public void testStatsCleanup() throws Exception {
+               final ExecutionJobVertex jobVertex = 
createExecutionJobVertex(JobStatus.RUNNING);
+               final int cleanUpInterval = 2000;
+               final long waitingTime = cleanUpInterval + 500;
 
-                       for (int i = 0; i < taskVertices.length; i++) {
-                               // Traces until sub task index are back 
pressured
-                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
-                       }
+               final int sampleId = 0;
+               final long startTime = System.currentTimeMillis();
+               final long endTime = startTime + 1;
+               final double backPressureRatio = 0.1;
 
-                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
-               }
+               final BackPressureStats backPressureStats = 
createBackPressureStats(
+                       jobVertex.getTaskVertices(), sampleId, startTime, 
endTime, backPressureRatio);
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       2000, 10000, Time.milliseconds(50), backPressureStats);
 
-               int sampleId = 1231;
-               int endTime = 841;
+               // trigger back pressure stats sampling
+               tracker.getOperatorBackPressureStats(jobVertex);
 
-               StackTraceSample sample = new StackTraceSample(
-                               sampleId,
-                               0,
-                               endTime,
-                               traces);
+               Optional<OperatorBackPressureStats> optionalStats = 
tracker.getOperatorBackPressureStats(jobVertex);
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
 
-               // Succeed the promise
-               sampleFuture.complete(sample);
+               checkOperatorBackPressureStats(jobVertex.getTaskVertices(), 
sampleId, endTime, backPressureRatio, stats);
 
-               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+               tracker.cleanUpOperatorStatsCache();
+               
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
 
-               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+               // wait until we are ready to cleanup
+               Thread.sleep(waitingTime);
 
-               // Verify the stats
-               Assert.assertEquals(sampleId, stats.getSampleId());
-               Assert.assertEquals(endTime, stats.getEndTimestamp());
-               Assert.assertEquals(taskVertices.length, 
stats.getNumberOfSubTasks());
+               tracker.cleanUpOperatorStatsCache();
+               
assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+       }
 
-               for (int i = 0; i < taskVertices.length; i++) {
-                       double ratio = stats.getBackPressureRatio(i);
-                       // Traces until sub task index are back pressured
-                       Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+       private void checkOperatorBackPressureStats(
+                       ExecutionVertex[] taskVertices,
+                       int sampleId,
+                       long endTime,
+                       double backPressureRatio,
+                       OperatorBackPressureStats stats) {
+
+               assertEquals(sampleId, stats.getSampleId());
+               assertEquals(endTime, stats.getEndTimestamp());
+               assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
+               for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
+                       assertEquals(backPressureRatio, 
stats.getBackPressureRatio(i), 0.0);
                }
        }
 
-       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
-               if (isBackPressure) {
-                       return new StackTraceElement[] { new StackTraceElement(
-                                       
BackPressureStatsTrackerImpl.EXPECTED_CLASS_NAME,
-                                       
BackPressureStatsTrackerImpl.EXPECTED_METHOD_NAME,
-                                       "LocalBufferPool.java",
-                                       133) };
-               } else {
-                       return Thread.currentThread().getStackTrace();
-               }
+       private BackPressureStatsTracker createBackPressureTracker(
+                       int cleanUpInterval,
+                       int backPressureStatsRefreshInterval,
+                       Time delayBetweenSamples,
+                       BackPressureStats... stats) {
+
+               final BackPressureSampleCoordinator coordinator = 
createBackPressureSampleCoordinator(stats);
+               return new BackPressureStatsTrackerImpl(
+                               coordinator,
+                               cleanUpInterval,
+                               
WebOptions.BACKPRESSURE_NUM_SAMPLES.defaultValue(),
+                               backPressureStatsRefreshInterval,
+                               delayBetweenSamples);
        }
 
-       private ExecutionVertex mockExecutionVertex(
-                       ExecutionJobVertex jobVertex,
-                       int subTaskIndex) {
+       private ExecutionVertex createExecutionVertex(ExecutionJobVertex 
jobVertex, int subTaskIndex) {
 
-               Execution exec = Mockito.mock(Execution.class);
-               Mockito.when(exec.getAttemptId()).thenReturn(new 
ExecutionAttemptID());
+               Execution exec = mock(Execution.class);
+               when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
 
                JobVertexID id = jobVertex.getJobVertexId();
 
-               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
-               Mockito.when(vertex.getJobvertexId()).thenReturn(id);
-               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-               
Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+               ExecutionVertex vertex = mock(ExecutionVertex.class);
+               when(vertex.getJobvertexId()).thenReturn(id);
+               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+               when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
 
                return vertex;
        }
 
+       private ExecutionJobVertex createExecutionJobVertex(JobStatus 
jobStatus) {
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               when(graph.getState()).thenReturn(jobStatus);
+               when(graph.getFutureExecutor()).thenReturn(Runnable::run);
+
+               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+               when(jobVertex.getJobId()).thenReturn(new JobID());
+               when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+               when(jobVertex.getGraph()).thenReturn(graph);
+               when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+               taskVertices[0] = createExecutionVertex(jobVertex, 0);
+               taskVertices[1] = createExecutionVertex(jobVertex, 1);
+               taskVertices[2] = createExecutionVertex(jobVertex, 2);
+               taskVertices[3] = createExecutionVertex(jobVertex, 3);
+
+               return jobVertex;
+       }
+
+       private BackPressureSampleCoordinator 
createBackPressureSampleCoordinator(BackPressureStats... stats) {
+
+               BackPressureSampleCoordinator sampleCoordinator = 
mock(BackPressureSampleCoordinator.class);
+
+               OngoingStubbing<CompletableFuture<BackPressureStats>> stubbing 
= when(
+                       sampleCoordinator.triggerTaskBackPressureSample(
+                               ArgumentMatchers.any(ExecutionVertex[].class),
+                               ArgumentMatchers.anyInt(),
+                               ArgumentMatchers.any(Time.class)));
+
+               for (BackPressureStats backPressureStats: stats) {
+                       stubbing = 
stubbing.thenReturn(CompletableFuture.completedFuture(backPressureStats));
+               }
+
+               return sampleCoordinator;
+       }
+
+       private BackPressureStats createBackPressureStats(
+                       ExecutionVertex[] taskVertices, int sampleId, long 
startTime, long endTime, double backPressureRatio) {
 
 Review comment:
   separate line for every argument

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to