dmvk commented on a change in pull request #16654:
URL: https://github.com/apache/flink/pull/16654#discussion_r681497890



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -139,49 +145,69 @@ public void 
testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
     /** Tests that cached result is NOT reused after refresh interval. */
     @Test
     public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
-        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
-        final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 
10;
+        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(1);
 
-        final int requestId2 = 1;
-        final JobVertexThreadInfoStats threadInfoStats2 =
+        // first entry is in the past, so refresh is triggered immediately 
upon fetching it
+        final JobVertexThreadInfoStats threadInfoStats =
                 createThreadInfoStats(
-                        requestId2, TIME_GAP, 
Collections.singletonList(threadInfoSample));
+                        Instant.now().minus(10, ChronoUnit.SECONDS),
+                        REQUEST_ID,
+                        Duration.ofMillis(5),
+                        Collections.singletonList(threadInfoSample));
+        final JobVertexThreadInfoStats threadInfoStats2 =
+                createThreadInfoStats(1, TIME_GAP, 
Collections.singletonList(threadInfoSample));
 
+        // register a CountDownLatch with the cache so we can await refresh of 
the entry
+        CountDownLatch latch = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(CLEAN_UP_INTERVAL, new 
LatchRemovalListener<>(latch));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         CLEAN_UP_INTERVAL,
                         threadInfoStatsRefreshInterval2,
-                        threadInfoStatsDefaultSample,
+                        vertexStatsCache,
+                        threadInfoStats,
                         threadInfoStats2);
-        doInitialRequestAndVerifyResult(tracker);
 
-        // ensure that the previous request "expires"
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // block until the async call completes and the first result is 
available
+        tracker.getResultAvailableFuture().get();
+
+        // retrieve the entry, triggering the refresh as side effect
+        assertExpectedEqualsReceived(
+                threadInfoStats, tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX));
+
+        // wait until the entry is refreshed, with generous buffer
+        assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
 
+        // verify that we get the second result on the next request
         Optional<JobVertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
-
         assertExpectedEqualsReceived(threadInfoStats2, result);
-
-        assertNotSame(result.get(), threadInfoStatsDefaultSample);
     }
 
     /** Tests that cached results are removed within the cleanup interval. */
     @Test
     public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
-        final Duration cleanUpInterval2 = Duration.ofMillis(10);
-        final long waitingTime = cleanUpInterval2.toMillis() + 10;
+        final Duration cleanUpInterval2 = Duration.ofMillis(1);
 
+        // register a CountDownLatch with the cache so we can await expiry of 
the entry
+        CountDownLatch latch = new CountDownLatch(1);

Review comment:
       nit: maybe this could have a more descriptive name, eg. `cacheExpired`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -139,49 +145,69 @@ public void 
testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
     /** Tests that cached result is NOT reused after refresh interval. */
     @Test
     public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
-        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
-        final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 
10;
+        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(1);

Review comment:
       nit: there are many variables with unnecessary suffix (`2`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to