zhijiangW commented on a change in pull request #10083: 
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r345664138
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 ##########
 @@ -18,166 +18,187 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
+import org.junit.rules.Timeout;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
- * Tests for the BackPressureStatsTrackerImpl.
+ * Tests for the {@link BackPressureStatsTrackerImpl}.
  */
 public class BackPressureStatsTrackerImplTest extends TestLogger {
 
-       /** Tests simple statistics with fake stack traces. */
+       private static final int requestId = 0;
+       private static final double backPressureRatio = 0.1;
+       private static final ExecutionJobVertex executionJobVertex = 
BackPressureTrackerTestUtils.createExecutionJobVertex();
+       private static final ExecutionVertex[] taskVertices = 
executionJobVertex.getTaskVertices();
+       private static final BackPressureStats backPressureStats = 
createBackPressureStats(requestId, 1, backPressureRatio);
+
+       @Rule
+       public Timeout caseTimeout = new Timeout(10, TimeUnit.SECONDS);
+
        @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
-
-               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-               Mockito.when(sampleCoordinator.triggerStackTraceSample(
-                               Matchers.any(ExecutionVertex[].class),
-                               Matchers.anyInt(),
-                               Matchers.any(Time.class),
-                               Matchers.anyInt())).thenReturn(sampleFuture);
-
-               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-               // Same Thread execution context
-               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-                       @Override
-                       public void execute(Runnable runnable) {
-                               runnable.run();
-                       }
-               });
-
-               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-               int numSamples = 100;
-               Time delayBetweenSamples = Time.milliseconds(100L);
-
-               BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
-                               sampleCoordinator, 9999, numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-               // getOperatorBackPressureStats triggers stack trace sampling
-               
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-               // Request back pressure stats again. This should not trigger 
another sample request
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
-
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               // Complete the future
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
-               for (ExecutionVertex vertex : taskVertices) {
-                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
+       public void testGetOperatorBackPressureStats() throws Exception {
+               
doInitialRequestAndVerifyTheResult(createBackPressureTracker(60000, 60000, 
backPressureStats));
+       }
 
-                       for (int i = 0; i < taskVertices.length; i++) {
-                               // Traces until sub task index are back 
pressured
-                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
-                       }
+       @Test
+       public void testCachedStatsNotUpdatedWithinRefreshInterval() throws 
Exception {
+               final double backPressureRatio2 = 0.2;
+               final BackPressureStats backPressureStats2 = 
createBackPressureStats(1, 60000, backPressureRatio2);
+
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(60000, 60000, backPressureStats, backPressureStats2);
+               doInitialRequestAndVerifyTheResult(tracker);
+               // verify that no new back pressure request is triggered
+               
checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
+       }
 
-                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
-               }
+       @Test
+       public void testCachedStatsUpdatedAfterRefreshInterval() throws 
Exception {
+               final int backPressureStatsRefreshInterval = 100;
+               final long waitingTime = backPressureStatsRefreshInterval + 100;
+               final double backPressureRatio2 = 0.2;
+               final BackPressureStats backPressureStats2 = 
createBackPressureStats(1, 60000, backPressureRatio2);
+
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(
+                       60000, backPressureStatsRefreshInterval, 
backPressureStats, backPressureStats2);
+               doInitialRequestAndVerifyTheResult(tracker);
+
+               // ensure that we are ready for next request
+               Thread.sleep(waitingTime);
+
+               // trigger next back pressure stats request and verify the 
result
+               
assertTrue(tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
+               checkOperatorBackPressureStats(backPressureRatio2, 
backPressureStats2, tracker.getOperatorBackPressureStats(executionJobVertex));
+       }
 
-               int sampleId = 1231;
-               int endTime = 841;
+       @Test
+       public void testShutDown() throws Exception {
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(60000, 60000, backPressureStats);
+               doInitialRequestAndVerifyTheResult(tracker);
 
-               StackTraceSample sample = new StackTraceSample(
-                               sampleId,
-                               0,
-                               endTime,
-                               traces);
+               // shutdown directly
+               tracker.shutDown();
 
-               // Succeed the promise
-               sampleFuture.complete(sample);
+               // verify that the previous cached result is invalid and 
trigger another request
+               
assertFalse(tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
+               // verify no response after shutdown
+               
assertFalse(tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
+       }
 
-               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+       @Test
+       public void testCachedStatsNotCleanedWithinCleanupInterval() throws 
Exception {
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(60000, 60000, backPressureStats);
+               doInitialRequestAndVerifyTheResult(tracker);
 
-               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+               tracker.cleanUpOperatorStatsCache();
+               // the back pressure stats should be still there
+               
checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
+       }
 
-               // Verify the stats
-               Assert.assertEquals(sampleId, stats.getSampleId());
-               Assert.assertEquals(endTime, stats.getEndTimestamp());
-               Assert.assertEquals(taskVertices.length, 
stats.getNumberOfSubTasks());
+       @Test
+       public void testCachedStatsCleanedAfterCleanupInterval() throws 
Exception {
+               final int cleanUpInterval = 100;
+               final long waitingTime = cleanUpInterval + 100;
 
-               for (int i = 0; i < taskVertices.length; i++) {
-                       double ratio = stats.getBackPressureRatio(i);
-                       // Traces until sub task index are back pressured
-                       Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
-               }
+               final BackPressureStatsTracker tracker = 
createBackPressureTracker(cleanUpInterval, 60000, backPressureStats);
+               doInitialRequestAndVerifyTheResult(tracker);
+
+               // wait until we are ready to cleanup
+               Thread.sleep(waitingTime);
+
+               // cleanup the cached back pressure stats
+               tracker.cleanUpOperatorStatsCache();
+               
assertFalse(tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        }
 
-       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
-               if (isBackPressure) {
-                       return new StackTraceElement[] { new StackTraceElement(
-                                       
BackPressureStatsTrackerImpl.EXPECTED_CLASS_NAME,
-                                       
BackPressureStatsTrackerImpl.EXPECTED_METHOD_NAME,
-                                       "LocalBufferPool.java",
-                                       133) };
-               } else {
-                       return Thread.currentThread().getStackTrace();
-               }
+       private void 
doInitialRequestAndVerifyTheResult(BackPressureStatsTracker tracker) {
+               // trigger back pressure stats request
+               
assertFalse(tracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
+               //  verify the result
+               
checkOperatorBackPressureStats(tracker.getOperatorBackPressureStats(executionJobVertex));
+       }
+
+       private void 
checkOperatorBackPressureStats(Optional<OperatorBackPressureStats> 
optionalStats) {
+               checkOperatorBackPressureStats(backPressureRatio, 
backPressureStats, optionalStats);
        }
 
-       private ExecutionVertex mockExecutionVertex(
-                       ExecutionJobVertex jobVertex,
-                       int subTaskIndex) {
+       private void checkOperatorBackPressureStats(
+                       double backPressureRatio,
+                       BackPressureStats backPressureStats,
+                       Optional<OperatorBackPressureStats> optionalStats) {
+               assertTrue(optionalStats.isPresent());
+               OperatorBackPressureStats stats = optionalStats.get();
+               assertEquals(backPressureStats.getRequestId(), 
stats.getRequestId());
+               assertEquals(backPressureStats.getEndTime(), 
stats.getEndTimestamp());
+               assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
+               for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
+                       assertEquals(backPressureRatio, 
stats.getBackPressureRatio(i), 0.0);
+               }
+       }
 
-               Execution exec = Mockito.mock(Execution.class);
-               Mockito.when(exec.getAttemptId()).thenReturn(new 
ExecutionAttemptID());
+       private BackPressureStatsTracker createBackPressureTracker(
+                       int cleanUpInterval,
+                       int backPressureStatsRefreshInterval,
+                       BackPressureStats... stats) {
+
+               final BackPressureRequestCoordinator coordinator =
+                       new 
TestingBackPressureRequestCoordinator(Runnable::run, 10000, stats);
+               return new BackPressureStatsTrackerImpl(
+                               coordinator,
+                               cleanUpInterval,
+                               backPressureStatsRefreshInterval);
+       }
 
-               JobVertexID id = jobVertex.getJobVertexId();
+       private static BackPressureStats createBackPressureStats(
+                       int requestId,
+                       long timeGap,
+                       double backPressureRatio) {
+               long startTime = System.currentTimeMillis();
+               long endTime = startTime + timeGap;
 
-               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
-               Mockito.when(vertex.getJobvertexId()).thenReturn(id);
-               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-               
Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+               final Map<ExecutionAttemptID, Double> backPressureRatiosByTask 
= new HashMap<>();
+               for (ExecutionVertex vertex : taskVertices) {
+                       
backPressureRatiosByTask.put(vertex.getCurrentExecutionAttempt().getAttemptId(),
 backPressureRatio);
+               }
 
-               return vertex;
+               return new BackPressureStats(requestId, startTime, endTime, 
backPressureRatiosByTask);
        }
 
+       /**
+        * A {@link BackPressureRequestCoordinator} which returns the 
pre-generated back pressure stats directly.
+        */
+       public static class TestingBackPressureRequestCoordinator extends 
BackPressureRequestCoordinator {
+
+               private final BackPressureStats[] backPressureStats;
+               private int counter = 0;
+
+               public TestingBackPressureRequestCoordinator(
+                       Executor executor,
 
 Review comment:
   one more indentation for arguments to distinguish with below codes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to